1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19//go:generate protoc --go_out=plugins=grpc:. codec_perf/perf.proto
20//go:generate protoc --go_out=plugins=grpc:. grpc_testing/test.proto
21
22package test
23
24import (
25	"bytes"
26	"context"
27	"crypto/tls"
28	"errors"
29	"flag"
30	"fmt"
31	"io"
32	"math"
33	"net"
34	"net/http"
35	"os"
36	"reflect"
37	"runtime"
38	"strings"
39	"sync"
40	"sync/atomic"
41	"syscall"
42	"testing"
43	"time"
44
45	"github.com/golang/protobuf/proto"
46	anypb "github.com/golang/protobuf/ptypes/any"
47	"golang.org/x/net/http2"
48	spb "google.golang.org/genproto/googleapis/rpc/status"
49	"google.golang.org/grpc"
50	"google.golang.org/grpc/balancer/roundrobin"
51	"google.golang.org/grpc/codes"
52	"google.golang.org/grpc/connectivity"
53	"google.golang.org/grpc/credentials"
54	_ "google.golang.org/grpc/encoding/gzip"
55	_ "google.golang.org/grpc/grpclog/glogger"
56	"google.golang.org/grpc/health"
57	healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
58	healthpb "google.golang.org/grpc/health/grpc_health_v1"
59	"google.golang.org/grpc/internal/channelz"
60	"google.golang.org/grpc/internal/grpcsync"
61	"google.golang.org/grpc/internal/leakcheck"
62	"google.golang.org/grpc/internal/testutils"
63	"google.golang.org/grpc/keepalive"
64	"google.golang.org/grpc/metadata"
65	"google.golang.org/grpc/peer"
66	"google.golang.org/grpc/resolver"
67	"google.golang.org/grpc/resolver/manual"
68	_ "google.golang.org/grpc/resolver/passthrough"
69	"google.golang.org/grpc/stats"
70	"google.golang.org/grpc/status"
71	"google.golang.org/grpc/tap"
72	testpb "google.golang.org/grpc/test/grpc_testing"
73	"google.golang.org/grpc/testdata"
74)
75
76func init() {
77	channelz.TurnOn()
78}
79
80var (
81	// For headers:
82	testMetadata = metadata.MD{
83		"key1":     []string{"value1"},
84		"key2":     []string{"value2"},
85		"key3-bin": []string{"binvalue1", string([]byte{1, 2, 3})},
86	}
87	testMetadata2 = metadata.MD{
88		"key1": []string{"value12"},
89		"key2": []string{"value22"},
90	}
91	// For trailers:
92	testTrailerMetadata = metadata.MD{
93		"tkey1":     []string{"trailerValue1"},
94		"tkey2":     []string{"trailerValue2"},
95		"tkey3-bin": []string{"trailerbinvalue1", string([]byte{3, 2, 1})},
96	}
97	testTrailerMetadata2 = metadata.MD{
98		"tkey1": []string{"trailerValue12"},
99		"tkey2": []string{"trailerValue22"},
100	}
101	// capital "Key" is illegal in HTTP/2.
102	malformedHTTP2Metadata = metadata.MD{
103		"Key": []string{"foo"},
104	}
105	testAppUA     = "myApp1/1.0 myApp2/0.9"
106	failAppUA     = "fail-this-RPC"
107	detailedError = status.ErrorProto(&spb.Status{
108		Code:    int32(codes.DataLoss),
109		Message: "error for testing: " + failAppUA,
110		Details: []*anypb.Any{{
111			TypeUrl: "url",
112			Value:   []byte{6, 0, 0, 6, 1, 3},
113		}},
114	})
115)
116
117var raceMode bool // set by race.go in race mode
118
119type testServer struct {
120	security           string // indicate the authentication protocol used by this server.
121	earlyFail          bool   // whether to error out the execution of a service handler prematurely.
122	setAndSendHeader   bool   // whether to call setHeader and sendHeader.
123	setHeaderOnly      bool   // whether to only call setHeader, not sendHeader.
124	multipleSetTrailer bool   // whether to call setTrailer multiple times.
125	unaryCallSleepTime time.Duration
126}
127
128func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
129	if md, ok := metadata.FromIncomingContext(ctx); ok {
130		// For testing purpose, returns an error if user-agent is failAppUA.
131		// To test that client gets the correct error.
132		if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
133			return nil, detailedError
134		}
135		var str []string
136		for _, entry := range md["user-agent"] {
137			str = append(str, "ua", entry)
138		}
139		grpc.SendHeader(ctx, metadata.Pairs(str...))
140	}
141	return new(testpb.Empty), nil
142}
143
144func newPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
145	if size < 0 {
146		return nil, fmt.Errorf("Requested a response with invalid length %d", size)
147	}
148	body := make([]byte, size)
149	switch t {
150	case testpb.PayloadType_COMPRESSABLE:
151	case testpb.PayloadType_UNCOMPRESSABLE:
152		return nil, fmt.Errorf("PayloadType UNCOMPRESSABLE is not supported")
153	default:
154		return nil, fmt.Errorf("Unsupported payload type: %d", t)
155	}
156	return &testpb.Payload{
157		Type: t,
158		Body: body,
159	}, nil
160}
161
162func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
163	md, ok := metadata.FromIncomingContext(ctx)
164	if ok {
165		if _, exists := md[":authority"]; !exists {
166			return nil, status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
167		}
168		if s.setAndSendHeader {
169			if err := grpc.SetHeader(ctx, md); err != nil {
170				return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
171			}
172			if err := grpc.SendHeader(ctx, testMetadata2); err != nil {
173				return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testMetadata2, err)
174			}
175		} else if s.setHeaderOnly {
176			if err := grpc.SetHeader(ctx, md); err != nil {
177				return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
178			}
179			if err := grpc.SetHeader(ctx, testMetadata2); err != nil {
180				return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", testMetadata2, err)
181			}
182		} else {
183			if err := grpc.SendHeader(ctx, md); err != nil {
184				return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err)
185			}
186		}
187		if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
188			return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
189		}
190		if s.multipleSetTrailer {
191			if err := grpc.SetTrailer(ctx, testTrailerMetadata2); err != nil {
192				return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata2, err)
193			}
194		}
195	}
196	pr, ok := peer.FromContext(ctx)
197	if !ok {
198		return nil, status.Error(codes.DataLoss, "failed to get peer from ctx")
199	}
200	if pr.Addr == net.Addr(nil) {
201		return nil, status.Error(codes.DataLoss, "failed to get peer address")
202	}
203	if s.security != "" {
204		// Check Auth info
205		var authType, serverName string
206		switch info := pr.AuthInfo.(type) {
207		case credentials.TLSInfo:
208			authType = info.AuthType()
209			serverName = info.State.ServerName
210		default:
211			return nil, status.Error(codes.Unauthenticated, "Unknown AuthInfo type")
212		}
213		if authType != s.security {
214			return nil, status.Errorf(codes.Unauthenticated, "Wrong auth type: got %q, want %q", authType, s.security)
215		}
216		if serverName != "x.test.youtube.com" {
217			return nil, status.Errorf(codes.Unauthenticated, "Unknown server name %q", serverName)
218		}
219	}
220	// Simulate some service delay.
221	time.Sleep(s.unaryCallSleepTime)
222
223	payload, err := newPayload(in.GetResponseType(), in.GetResponseSize())
224	if err != nil {
225		return nil, err
226	}
227
228	return &testpb.SimpleResponse{
229		Payload: payload,
230	}, nil
231}
232
233func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
234	if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
235		if _, exists := md[":authority"]; !exists {
236			return status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
237		}
238		// For testing purpose, returns an error if user-agent is failAppUA.
239		// To test that client gets the correct error.
240		if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
241			return status.Error(codes.DataLoss, "error for testing: "+failAppUA)
242		}
243	}
244	cs := args.GetResponseParameters()
245	for _, c := range cs {
246		if us := c.GetIntervalUs(); us > 0 {
247			time.Sleep(time.Duration(us) * time.Microsecond)
248		}
249
250		payload, err := newPayload(args.GetResponseType(), c.GetSize())
251		if err != nil {
252			return err
253		}
254
255		if err := stream.Send(&testpb.StreamingOutputCallResponse{
256			Payload: payload,
257		}); err != nil {
258			return err
259		}
260	}
261	return nil
262}
263
264func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
265	var sum int
266	for {
267		in, err := stream.Recv()
268		if err == io.EOF {
269			return stream.SendAndClose(&testpb.StreamingInputCallResponse{
270				AggregatedPayloadSize: int32(sum),
271			})
272		}
273		if err != nil {
274			return err
275		}
276		p := in.GetPayload().GetBody()
277		sum += len(p)
278		if s.earlyFail {
279			return status.Error(codes.NotFound, "not found")
280		}
281	}
282}
283
284func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
285	md, ok := metadata.FromIncomingContext(stream.Context())
286	if ok {
287		if s.setAndSendHeader {
288			if err := stream.SetHeader(md); err != nil {
289				return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
290			}
291			if err := stream.SendHeader(testMetadata2); err != nil {
292				return status.Errorf(status.Code(err), "%v.SendHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
293			}
294		} else if s.setHeaderOnly {
295			if err := stream.SetHeader(md); err != nil {
296				return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
297			}
298			if err := stream.SetHeader(testMetadata2); err != nil {
299				return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
300			}
301		} else {
302			if err := stream.SendHeader(md); err != nil {
303				return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
304			}
305		}
306		stream.SetTrailer(testTrailerMetadata)
307		if s.multipleSetTrailer {
308			stream.SetTrailer(testTrailerMetadata2)
309		}
310	}
311	for {
312		in, err := stream.Recv()
313		if err == io.EOF {
314			// read done.
315			return nil
316		}
317		if err != nil {
318			// to facilitate testSvrWriteStatusEarlyWrite
319			if status.Code(err) == codes.ResourceExhausted {
320				return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
321			}
322			return err
323		}
324		cs := in.GetResponseParameters()
325		for _, c := range cs {
326			if us := c.GetIntervalUs(); us > 0 {
327				time.Sleep(time.Duration(us) * time.Microsecond)
328			}
329
330			payload, err := newPayload(in.GetResponseType(), c.GetSize())
331			if err != nil {
332				return err
333			}
334
335			if err := stream.Send(&testpb.StreamingOutputCallResponse{
336				Payload: payload,
337			}); err != nil {
338				// to facilitate testSvrWriteStatusEarlyWrite
339				if status.Code(err) == codes.ResourceExhausted {
340					return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
341				}
342				return err
343			}
344		}
345	}
346}
347
348func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error {
349	var msgBuf []*testpb.StreamingOutputCallRequest
350	for {
351		in, err := stream.Recv()
352		if err == io.EOF {
353			// read done.
354			break
355		}
356		if err != nil {
357			return err
358		}
359		msgBuf = append(msgBuf, in)
360	}
361	for _, m := range msgBuf {
362		cs := m.GetResponseParameters()
363		for _, c := range cs {
364			if us := c.GetIntervalUs(); us > 0 {
365				time.Sleep(time.Duration(us) * time.Microsecond)
366			}
367
368			payload, err := newPayload(m.GetResponseType(), c.GetSize())
369			if err != nil {
370				return err
371			}
372
373			if err := stream.Send(&testpb.StreamingOutputCallResponse{
374				Payload: payload,
375			}); err != nil {
376				return err
377			}
378		}
379	}
380	return nil
381}
382
383type env struct {
384	name         string
385	network      string // The type of network such as tcp, unix, etc.
386	security     string // The security protocol such as TLS, SSH, etc.
387	httpHandler  bool   // whether to use the http.Handler ServerTransport; requires TLS
388	balancer     string // One of "round_robin", "pick_first", "v1", or "".
389	customDialer func(string, string, time.Duration) (net.Conn, error)
390}
391
392func (e env) runnable() bool {
393	if runtime.GOOS == "windows" && e.network == "unix" {
394		return false
395	}
396	return true
397}
398
399func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) {
400	if e.customDialer != nil {
401		return e.customDialer(e.network, addr, timeout)
402	}
403	return net.DialTimeout(e.network, addr, timeout)
404}
405
406var (
407	tcpClearEnv   = env{name: "tcp-clear-v1-balancer", network: "tcp", balancer: "v1"}
408	tcpTLSEnv     = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls", balancer: "v1"}
409	tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "round_robin"}
410	tcpTLSRREnv   = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "round_robin"}
411	handlerEnv    = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "round_robin"}
412	noBalancerEnv = env{name: "no-balancer", network: "tcp", security: "tls"}
413	allEnv        = []env{tcpClearEnv, tcpTLSEnv, tcpClearRREnv, tcpTLSRREnv, handlerEnv, noBalancerEnv}
414)
415
416var onlyEnv = flag.String("only_env", "", "If non-empty, one of 'tcp-clear', 'tcp-tls', 'unix-clear', 'unix-tls', or 'handler-tls' to only run the tests for that environment. Empty means all.")
417
418func listTestEnv() (envs []env) {
419	if *onlyEnv != "" {
420		for _, e := range allEnv {
421			if e.name == *onlyEnv {
422				if !e.runnable() {
423					panic(fmt.Sprintf("--only_env environment %q does not run on %s", *onlyEnv, runtime.GOOS))
424				}
425				return []env{e}
426			}
427		}
428		panic(fmt.Sprintf("invalid --only_env value %q", *onlyEnv))
429	}
430	for _, e := range allEnv {
431		if e.runnable() {
432			envs = append(envs, e)
433		}
434	}
435	return envs
436}
437
438// test is an end-to-end test. It should be created with the newTest
439// func, modified as needed, and then started with its startServer method.
440// It should be cleaned up with the tearDown method.
441type test struct {
442	t *testing.T
443	e env
444
445	ctx    context.Context // valid for life of test, before tearDown
446	cancel context.CancelFunc
447
448	// Configurable knobs, after newTest returns:
449	testServer              testpb.TestServiceServer // nil means none
450	healthServer            *health.Server           // nil means disabled
451	maxStream               uint32
452	tapHandle               tap.ServerInHandle
453	maxMsgSize              *int
454	maxClientReceiveMsgSize *int
455	maxClientSendMsgSize    *int
456	maxServerReceiveMsgSize *int
457	maxServerSendMsgSize    *int
458	maxClientHeaderListSize *uint32
459	maxServerHeaderListSize *uint32
460	userAgent               string
461	// clientCompression and serverCompression are set to test the deprecated API
462	// WithCompressor and WithDecompressor.
463	clientCompression bool
464	serverCompression bool
465	// clientUseCompression is set to test the new compressor registration API UseCompressor.
466	clientUseCompression bool
467	// clientNopCompression is set to create a compressor whose type is not supported.
468	clientNopCompression        bool
469	unaryClientInt              grpc.UnaryClientInterceptor
470	streamClientInt             grpc.StreamClientInterceptor
471	unaryServerInt              grpc.UnaryServerInterceptor
472	streamServerInt             grpc.StreamServerInterceptor
473	unknownHandler              grpc.StreamHandler
474	sc                          <-chan grpc.ServiceConfig
475	customCodec                 grpc.Codec
476	serverInitialWindowSize     int32
477	serverInitialConnWindowSize int32
478	clientInitialWindowSize     int32
479	clientInitialConnWindowSize int32
480	perRPCCreds                 credentials.PerRPCCredentials
481	customDialOptions           []grpc.DialOption
482	customServerOptions         []grpc.ServerOption
483	resolverScheme              string
484	cliKeepAlive                *keepalive.ClientParameters
485	svrKeepAlive                *keepalive.ServerParameters
486
487	// All test dialing is blocking by default. Set this to true if dial
488	// should be non-blocking.
489	nonBlockingDial bool
490
491	// srv and srvAddr are set once startServer is called.
492	srv     stopper
493	srvAddr string
494
495	// srvs and srvAddrs are set once startServers is called.
496	srvs     []*grpc.Server
497	srvAddrs []string
498
499	cc          *grpc.ClientConn // nil until requested via clientConn
500	restoreLogs func()           // nil unless declareLogNoise is used
501}
502
503type stopper interface {
504	Stop()
505	GracefulStop()
506}
507
508func (te *test) tearDown() {
509	if te.cancel != nil {
510		te.cancel()
511		te.cancel = nil
512	}
513
514	if te.cc != nil {
515		te.cc.Close()
516		te.cc = nil
517	}
518
519	if te.restoreLogs != nil {
520		te.restoreLogs()
521		te.restoreLogs = nil
522	}
523
524	if te.srv != nil {
525		te.srv.Stop()
526	}
527	if len(te.srvs) != 0 {
528		for _, s := range te.srvs {
529			s.Stop()
530		}
531	}
532}
533
534// newTest returns a new test using the provided testing.T and
535// environment.  It is returned with default values. Tests should
536// modify it before calling its startServer and clientConn methods.
537func newTest(t *testing.T, e env) *test {
538	te := &test{
539		t:         t,
540		e:         e,
541		maxStream: math.MaxUint32,
542	}
543	te.ctx, te.cancel = context.WithCancel(context.Background())
544	return te
545}
546
547func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener {
548	te.testServer = ts
549	te.t.Logf("Running test in %s environment...", te.e.name)
550	sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
551	if te.maxMsgSize != nil {
552		sopts = append(sopts, grpc.MaxMsgSize(*te.maxMsgSize))
553	}
554	if te.maxServerReceiveMsgSize != nil {
555		sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize))
556	}
557	if te.maxServerSendMsgSize != nil {
558		sopts = append(sopts, grpc.MaxSendMsgSize(*te.maxServerSendMsgSize))
559	}
560	if te.maxServerHeaderListSize != nil {
561		sopts = append(sopts, grpc.MaxHeaderListSize(*te.maxServerHeaderListSize))
562	}
563	if te.tapHandle != nil {
564		sopts = append(sopts, grpc.InTapHandle(te.tapHandle))
565	}
566	if te.serverCompression {
567		sopts = append(sopts,
568			grpc.RPCCompressor(grpc.NewGZIPCompressor()),
569			grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
570		)
571	}
572	if te.unaryServerInt != nil {
573		sopts = append(sopts, grpc.UnaryInterceptor(te.unaryServerInt))
574	}
575	if te.streamServerInt != nil {
576		sopts = append(sopts, grpc.StreamInterceptor(te.streamServerInt))
577	}
578	if te.unknownHandler != nil {
579		sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
580	}
581	if te.serverInitialWindowSize > 0 {
582		sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
583	}
584	if te.serverInitialConnWindowSize > 0 {
585		sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
586	}
587	la := "localhost:0"
588	switch te.e.network {
589	case "unix":
590		la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano())
591		syscall.Unlink(la)
592	}
593	lis, err := listen(te.e.network, la)
594	if err != nil {
595		te.t.Fatalf("Failed to listen: %v", err)
596	}
597	switch te.e.security {
598	case "tls":
599		creds, err := credentials.NewServerTLSFromFile(testdata.Path("server1.pem"), testdata.Path("server1.key"))
600		if err != nil {
601			te.t.Fatalf("Failed to generate credentials %v", err)
602		}
603		sopts = append(sopts, grpc.Creds(creds))
604	case "clientTimeoutCreds":
605		sopts = append(sopts, grpc.Creds(&clientTimeoutCreds{}))
606	}
607	if te.customCodec != nil {
608		sopts = append(sopts, grpc.CustomCodec(te.customCodec))
609	}
610	if te.svrKeepAlive != nil {
611		sopts = append(sopts, grpc.KeepaliveParams(*te.svrKeepAlive))
612	}
613	sopts = append(sopts, te.customServerOptions...)
614	s := grpc.NewServer(sopts...)
615	te.srv = s
616	if te.healthServer != nil {
617		healthgrpc.RegisterHealthServer(s, te.healthServer)
618	}
619	if te.testServer != nil {
620		testpb.RegisterTestServiceServer(s, te.testServer)
621	}
622	addr := la
623	switch te.e.network {
624	case "unix":
625	default:
626		_, port, err := net.SplitHostPort(lis.Addr().String())
627		if err != nil {
628			te.t.Fatalf("Failed to parse listener address: %v", err)
629		}
630		addr = "localhost:" + port
631	}
632
633	te.srvAddr = addr
634
635	if te.e.httpHandler {
636		if te.e.security != "tls" {
637			te.t.Fatalf("unsupported environment settings")
638		}
639		cert, err := tls.LoadX509KeyPair(testdata.Path("server1.pem"), testdata.Path("server1.key"))
640		if err != nil {
641			te.t.Fatal("Error creating TLS certificate: ", err)
642		}
643		hs := &http.Server{
644			Handler: s,
645		}
646		err = http2.ConfigureServer(hs, &http2.Server{
647			MaxConcurrentStreams: te.maxStream,
648		})
649		if err != nil {
650			te.t.Fatal("error starting http2 server: ", err)
651		}
652		hs.TLSConfig.Certificates = []tls.Certificate{cert}
653		tlsListener := tls.NewListener(lis, hs.TLSConfig)
654		whs := &wrapHS{Listener: tlsListener, s: hs, conns: make(map[net.Conn]bool)}
655		te.srv = whs
656		go hs.Serve(whs)
657
658		return lis
659	}
660
661	go s.Serve(lis)
662	return lis
663}
664
665// TODO: delete wrapHS and wrapConn when Go1.6 and Go1.7 support are gone and
666// call s.Close and s.Shutdown instead.
667type wrapHS struct {
668	sync.Mutex
669	net.Listener
670	s     *http.Server
671	conns map[net.Conn]bool
672}
673
674func (w *wrapHS) Accept() (net.Conn, error) {
675	c, err := w.Listener.Accept()
676	if err != nil {
677		return nil, err
678	}
679	w.Lock()
680	if w.conns == nil {
681		w.Unlock()
682		c.Close()
683		return nil, errors.New("connection after listener closed")
684	}
685	w.conns[&wrapConn{Conn: c, hs: w}] = true
686	w.Unlock()
687	return c, nil
688}
689
690func (w *wrapHS) Stop() {
691	w.Listener.Close()
692	w.Lock()
693	conns := w.conns
694	w.conns = nil
695	w.Unlock()
696	for c := range conns {
697		c.Close()
698	}
699}
700
701// Poll for now..
702func (w *wrapHS) GracefulStop() {
703	w.Listener.Close()
704	for {
705		w.Lock()
706		l := len(w.conns)
707		w.Unlock()
708		if l == 0 {
709			return
710		}
711		time.Sleep(50 * time.Millisecond)
712	}
713}
714
715type wrapConn struct {
716	net.Conn
717	hs *wrapHS
718}
719
720func (w *wrapConn) Close() error {
721	w.hs.Lock()
722	delete(w.hs.conns, w.Conn)
723	w.hs.Unlock()
724	return w.Conn.Close()
725}
726
727func (te *test) startServerWithConnControl(ts testpb.TestServiceServer) *listenerWrapper {
728	l := te.listenAndServe(ts, listenWithConnControl)
729	return l.(*listenerWrapper)
730}
731
732// startServer starts a gRPC server listening. Callers should defer a
733// call to te.tearDown to clean up.
734func (te *test) startServer(ts testpb.TestServiceServer) {
735	te.listenAndServe(ts, net.Listen)
736}
737
738type nopCompressor struct {
739	grpc.Compressor
740}
741
742// NewNopCompressor creates a compressor to test the case that type is not supported.
743func NewNopCompressor() grpc.Compressor {
744	return &nopCompressor{grpc.NewGZIPCompressor()}
745}
746
747func (c *nopCompressor) Type() string {
748	return "nop"
749}
750
751type nopDecompressor struct {
752	grpc.Decompressor
753}
754
755// NewNopDecompressor creates a decompressor to test the case that type is not supported.
756func NewNopDecompressor() grpc.Decompressor {
757	return &nopDecompressor{grpc.NewGZIPDecompressor()}
758}
759
760func (d *nopDecompressor) Type() string {
761	return "nop"
762}
763
764func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) {
765	opts = append(opts, grpc.WithDialer(te.e.dialer), grpc.WithUserAgent(te.userAgent))
766
767	if te.sc != nil {
768		opts = append(opts, grpc.WithServiceConfig(te.sc))
769	}
770
771	if te.clientCompression {
772		opts = append(opts,
773			grpc.WithCompressor(grpc.NewGZIPCompressor()),
774			grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
775		)
776	}
777	if te.clientUseCompression {
778		opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
779	}
780	if te.clientNopCompression {
781		opts = append(opts,
782			grpc.WithCompressor(NewNopCompressor()),
783			grpc.WithDecompressor(NewNopDecompressor()),
784		)
785	}
786	if te.unaryClientInt != nil {
787		opts = append(opts, grpc.WithUnaryInterceptor(te.unaryClientInt))
788	}
789	if te.streamClientInt != nil {
790		opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt))
791	}
792	if te.maxMsgSize != nil {
793		opts = append(opts, grpc.WithMaxMsgSize(*te.maxMsgSize))
794	}
795	if te.maxClientReceiveMsgSize != nil {
796		opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize)))
797	}
798	if te.maxClientSendMsgSize != nil {
799		opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(*te.maxClientSendMsgSize)))
800	}
801	if te.maxClientHeaderListSize != nil {
802		opts = append(opts, grpc.WithMaxHeaderListSize(*te.maxClientHeaderListSize))
803	}
804	switch te.e.security {
805	case "tls":
806		creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com")
807		if err != nil {
808			te.t.Fatalf("Failed to load credentials: %v", err)
809		}
810		opts = append(opts, grpc.WithTransportCredentials(creds))
811	case "clientTimeoutCreds":
812		opts = append(opts, grpc.WithTransportCredentials(&clientTimeoutCreds{}))
813	case "empty":
814		// Don't add any transport creds option.
815	default:
816		opts = append(opts, grpc.WithInsecure())
817	}
818	// TODO(bar) switch balancer case "pick_first".
819	var scheme string
820	if te.resolverScheme == "" {
821		scheme = "passthrough:///"
822	} else {
823		scheme = te.resolverScheme + ":///"
824	}
825	switch te.e.balancer {
826	case "v1":
827		opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil)))
828	case "round_robin":
829		opts = append(opts, grpc.WithBalancerName(roundrobin.Name))
830	}
831	if te.clientInitialWindowSize > 0 {
832		opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
833	}
834	if te.clientInitialConnWindowSize > 0 {
835		opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
836	}
837	if te.perRPCCreds != nil {
838		opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds))
839	}
840	if te.customCodec != nil {
841		opts = append(opts, grpc.WithDefaultCallOptions(grpc.CallCustomCodec(te.customCodec)))
842	}
843	if !te.nonBlockingDial && te.srvAddr != "" {
844		// Only do a blocking dial if server is up.
845		opts = append(opts, grpc.WithBlock())
846	}
847	if te.srvAddr == "" {
848		te.srvAddr = "client.side.only.test"
849	}
850	if te.cliKeepAlive != nil {
851		opts = append(opts, grpc.WithKeepaliveParams(*te.cliKeepAlive))
852	}
853	opts = append(opts, te.customDialOptions...)
854	return opts, scheme
855}
856
857func (te *test) clientConnWithConnControl() (*grpc.ClientConn, *dialerWrapper) {
858	if te.cc != nil {
859		return te.cc, nil
860	}
861	opts, scheme := te.configDial()
862	dw := &dialerWrapper{}
863	// overwrite the dialer before
864	opts = append(opts, grpc.WithDialer(dw.dialer))
865	var err error
866	te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...)
867	if err != nil {
868		te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err)
869	}
870	return te.cc, dw
871}
872
873func (te *test) clientConn(opts ...grpc.DialOption) *grpc.ClientConn {
874	if te.cc != nil {
875		return te.cc
876	}
877	var scheme string
878	opts, scheme = te.configDial(opts...)
879	var err error
880	te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...)
881	if err != nil {
882		te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err)
883	}
884	return te.cc
885}
886
887func (te *test) declareLogNoise(phrases ...string) {
888	te.restoreLogs = declareLogNoise(te.t, phrases...)
889}
890
891func (te *test) withServerTester(fn func(st *serverTester)) {
892	c, err := te.e.dialer(te.srvAddr, 10*time.Second)
893	if err != nil {
894		te.t.Fatal(err)
895	}
896	defer c.Close()
897	if te.e.security == "tls" {
898		c = tls.Client(c, &tls.Config{
899			InsecureSkipVerify: true,
900			NextProtos:         []string{http2.NextProtoTLS},
901		})
902	}
903	st := newServerTesterFromConn(te.t, c)
904	st.greet()
905	fn(st)
906}
907
908type lazyConn struct {
909	net.Conn
910	beLazy int32
911}
912
913func (l *lazyConn) Write(b []byte) (int, error) {
914	if atomic.LoadInt32(&(l.beLazy)) == 1 {
915		time.Sleep(time.Second)
916	}
917	return l.Conn.Write(b)
918}
919
920func TestContextDeadlineNotIgnored(t *testing.T) {
921	defer leakcheck.Check(t)
922	e := noBalancerEnv
923	var lc *lazyConn
924	e.customDialer = func(network, addr string, timeout time.Duration) (net.Conn, error) {
925		conn, err := net.DialTimeout(network, addr, timeout)
926		if err != nil {
927			return nil, err
928		}
929		lc = &lazyConn{Conn: conn}
930		return lc, nil
931	}
932
933	te := newTest(t, e)
934	te.startServer(&testServer{security: e.security})
935	defer te.tearDown()
936
937	cc := te.clientConn()
938	tc := testpb.NewTestServiceClient(cc)
939	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
940		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
941	}
942	atomic.StoreInt32(&(lc.beLazy), 1)
943	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
944	defer cancel()
945	t1 := time.Now()
946	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
947		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err)
948	}
949	if time.Since(t1) > 2*time.Second {
950		t.Fatalf("TestService/EmptyCall(_, _) ran over the deadline")
951	}
952}
953
954func TestTimeoutOnDeadServer(t *testing.T) {
955	defer leakcheck.Check(t)
956	for _, e := range listTestEnv() {
957		testTimeoutOnDeadServer(t, e)
958	}
959}
960
961func testTimeoutOnDeadServer(t *testing.T, e env) {
962	te := newTest(t, e)
963	te.customDialOptions = []grpc.DialOption{grpc.WithWaitForHandshake()}
964	te.userAgent = testAppUA
965	te.declareLogNoise(
966		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
967		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
968		"grpc: addrConn.resetTransport failed to create client transport: connection error",
969	)
970	te.startServer(&testServer{security: e.security})
971	defer te.tearDown()
972
973	cc := te.clientConn()
974	tc := testpb.NewTestServiceClient(cc)
975	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
976		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
977	}
978	te.srv.Stop()
979
980	// Wait for the client to notice the connection is gone.
981	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
982	state := cc.GetState()
983	for ; state == connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
984	}
985	cancel()
986	if state == connectivity.Ready {
987		t.Fatalf("Timed out waiting for non-ready state")
988	}
989	ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond)
990	_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false))
991	cancel()
992	if e.balancer != "" && status.Code(err) != codes.DeadlineExceeded {
993		// If e.balancer == nil, the ac will stop reconnecting because the dialer returns non-temp error,
994		// the error will be an internal error.
995		t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded)
996	}
997	awaitNewConnLogOutput()
998}
999
1000func TestServerGracefulStopIdempotent(t *testing.T) {
1001	defer leakcheck.Check(t)
1002	for _, e := range listTestEnv() {
1003		if e.name == "handler-tls" {
1004			continue
1005		}
1006		testServerGracefulStopIdempotent(t, e)
1007	}
1008}
1009
1010func testServerGracefulStopIdempotent(t *testing.T, e env) {
1011	te := newTest(t, e)
1012	te.userAgent = testAppUA
1013	te.startServer(&testServer{security: e.security})
1014	defer te.tearDown()
1015
1016	for i := 0; i < 3; i++ {
1017		te.srv.GracefulStop()
1018	}
1019}
1020
1021func TestServerGoAway(t *testing.T) {
1022	defer leakcheck.Check(t)
1023	for _, e := range listTestEnv() {
1024		if e.name == "handler-tls" {
1025			continue
1026		}
1027		testServerGoAway(t, e)
1028	}
1029}
1030
1031func testServerGoAway(t *testing.T, e env) {
1032	te := newTest(t, e)
1033	te.userAgent = testAppUA
1034	te.startServer(&testServer{security: e.security})
1035	defer te.tearDown()
1036
1037	cc := te.clientConn()
1038	tc := testpb.NewTestServiceClient(cc)
1039	// Finish an RPC to make sure the connection is good.
1040	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1041		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1042	}
1043	ch := make(chan struct{})
1044	go func() {
1045		te.srv.GracefulStop()
1046		close(ch)
1047	}()
1048	// Loop until the server side GoAway signal is propagated to the client.
1049	for {
1050		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1051		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) != codes.DeadlineExceeded {
1052			cancel()
1053			break
1054		}
1055		cancel()
1056	}
1057	// A new RPC should fail.
1058	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal {
1059		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
1060	}
1061	<-ch
1062	awaitNewConnLogOutput()
1063}
1064
1065func TestServerGoAwayPendingRPC(t *testing.T) {
1066	defer leakcheck.Check(t)
1067	for _, e := range listTestEnv() {
1068		if e.name == "handler-tls" {
1069			continue
1070		}
1071		testServerGoAwayPendingRPC(t, e)
1072	}
1073}
1074
1075func testServerGoAwayPendingRPC(t *testing.T, e env) {
1076	te := newTest(t, e)
1077	te.userAgent = testAppUA
1078	te.declareLogNoise(
1079		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1080		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1081		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1082	)
1083	te.startServer(&testServer{security: e.security})
1084	defer te.tearDown()
1085
1086	cc := te.clientConn()
1087	tc := testpb.NewTestServiceClient(cc)
1088	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1089	stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
1090	if err != nil {
1091		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1092	}
1093	// Finish an RPC to make sure the connection is good.
1094	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1095		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1096	}
1097	ch := make(chan struct{})
1098	go func() {
1099		te.srv.GracefulStop()
1100		close(ch)
1101	}()
1102	// Loop until the server side GoAway signal is propagated to the client.
1103	start := time.Now()
1104	errored := false
1105	for time.Since(start) < time.Second {
1106		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1107		_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false))
1108		cancel()
1109		if err != nil {
1110			errored = true
1111			break
1112		}
1113	}
1114	if !errored {
1115		t.Fatalf("GoAway never received by client")
1116	}
1117	respParam := []*testpb.ResponseParameters{{Size: 1}}
1118	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
1119	if err != nil {
1120		t.Fatal(err)
1121	}
1122	req := &testpb.StreamingOutputCallRequest{
1123		ResponseType:       testpb.PayloadType_COMPRESSABLE,
1124		ResponseParameters: respParam,
1125		Payload:            payload,
1126	}
1127	// The existing RPC should be still good to proceed.
1128	if err := stream.Send(req); err != nil {
1129		t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
1130	}
1131	if _, err := stream.Recv(); err != nil {
1132		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
1133	}
1134	// The RPC will run until canceled.
1135	cancel()
1136	<-ch
1137	awaitNewConnLogOutput()
1138}
1139
1140func TestServerMultipleGoAwayPendingRPC(t *testing.T) {
1141	defer leakcheck.Check(t)
1142	for _, e := range listTestEnv() {
1143		if e.name == "handler-tls" {
1144			continue
1145		}
1146		testServerMultipleGoAwayPendingRPC(t, e)
1147	}
1148}
1149
1150func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
1151	te := newTest(t, e)
1152	te.userAgent = testAppUA
1153	te.declareLogNoise(
1154		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1155		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1156		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1157	)
1158	te.startServer(&testServer{security: e.security})
1159	defer te.tearDown()
1160
1161	cc := te.clientConn()
1162	tc := testpb.NewTestServiceClient(cc)
1163	ctx, cancel := context.WithCancel(context.Background())
1164	stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
1165	if err != nil {
1166		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1167	}
1168	// Finish an RPC to make sure the connection is good.
1169	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1170		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1171	}
1172	ch1 := make(chan struct{})
1173	go func() {
1174		te.srv.GracefulStop()
1175		close(ch1)
1176	}()
1177	ch2 := make(chan struct{})
1178	go func() {
1179		te.srv.GracefulStop()
1180		close(ch2)
1181	}()
1182	// Loop until the server side GoAway signal is propagated to the client.
1183	for {
1184		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1185		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1186			cancel()
1187			break
1188		}
1189		cancel()
1190	}
1191	select {
1192	case <-ch1:
1193		t.Fatal("GracefulStop() terminated early")
1194	case <-ch2:
1195		t.Fatal("GracefulStop() terminated early")
1196	default:
1197	}
1198	respParam := []*testpb.ResponseParameters{
1199		{
1200			Size: 1,
1201		},
1202	}
1203	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
1204	if err != nil {
1205		t.Fatal(err)
1206	}
1207	req := &testpb.StreamingOutputCallRequest{
1208		ResponseType:       testpb.PayloadType_COMPRESSABLE,
1209		ResponseParameters: respParam,
1210		Payload:            payload,
1211	}
1212	// The existing RPC should be still good to proceed.
1213	if err := stream.Send(req); err != nil {
1214		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
1215	}
1216	if _, err := stream.Recv(); err != nil {
1217		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
1218	}
1219	if err := stream.CloseSend(); err != nil {
1220		t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
1221	}
1222	<-ch1
1223	<-ch2
1224	cancel()
1225	awaitNewConnLogOutput()
1226}
1227
1228func TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) {
1229	defer leakcheck.Check(t)
1230	for _, e := range listTestEnv() {
1231		if e.name == "handler-tls" {
1232			continue
1233		}
1234		testConcurrentClientConnCloseAndServerGoAway(t, e)
1235	}
1236}
1237
1238func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
1239	te := newTest(t, e)
1240	te.userAgent = testAppUA
1241	te.declareLogNoise(
1242		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1243		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1244		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1245	)
1246	te.startServer(&testServer{security: e.security})
1247	defer te.tearDown()
1248
1249	cc := te.clientConn()
1250	tc := testpb.NewTestServiceClient(cc)
1251	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1252		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1253	}
1254	ch := make(chan struct{})
1255	// Close ClientConn and Server concurrently.
1256	go func() {
1257		te.srv.GracefulStop()
1258		close(ch)
1259	}()
1260	go func() {
1261		cc.Close()
1262	}()
1263	<-ch
1264}
1265
1266func TestConcurrentServerStopAndGoAway(t *testing.T) {
1267	defer leakcheck.Check(t)
1268	for _, e := range listTestEnv() {
1269		if e.name == "handler-tls" {
1270			continue
1271		}
1272		testConcurrentServerStopAndGoAway(t, e)
1273	}
1274}
1275
1276func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
1277	te := newTest(t, e)
1278	te.userAgent = testAppUA
1279	te.declareLogNoise(
1280		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1281		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1282		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1283	)
1284	te.startServer(&testServer{security: e.security})
1285	defer te.tearDown()
1286
1287	cc := te.clientConn()
1288	tc := testpb.NewTestServiceClient(cc)
1289	stream, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false))
1290	if err != nil {
1291		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1292	}
1293	// Finish an RPC to make sure the connection is good.
1294	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1295		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1296	}
1297	ch := make(chan struct{})
1298	go func() {
1299		te.srv.GracefulStop()
1300		close(ch)
1301	}()
1302	// Loop until the server side GoAway signal is propagated to the client.
1303	for {
1304		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1305		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
1306			cancel()
1307			break
1308		}
1309		cancel()
1310	}
1311	// Stop the server and close all the connections.
1312	te.srv.Stop()
1313	respParam := []*testpb.ResponseParameters{
1314		{
1315			Size: 1,
1316		},
1317	}
1318	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
1319	if err != nil {
1320		t.Fatal(err)
1321	}
1322	req := &testpb.StreamingOutputCallRequest{
1323		ResponseType:       testpb.PayloadType_COMPRESSABLE,
1324		ResponseParameters: respParam,
1325		Payload:            payload,
1326	}
1327	sendStart := time.Now()
1328	for {
1329		if err := stream.Send(req); err == io.EOF {
1330			// stream.Send should eventually send io.EOF
1331			break
1332		} else if err != nil {
1333			// Send should never return a transport-level error.
1334			t.Fatalf("stream.Send(%v) = %v; want <nil or io.EOF>", req, err)
1335		}
1336		if time.Since(sendStart) > 2*time.Second {
1337			t.Fatalf("stream.Send(_) did not return io.EOF after 2s")
1338		}
1339		time.Sleep(time.Millisecond)
1340	}
1341	if _, err := stream.Recv(); err == nil || err == io.EOF {
1342		t.Fatalf("%v.Recv() = _, %v, want _, <non-nil, non-EOF>", stream, err)
1343	}
1344	<-ch
1345	awaitNewConnLogOutput()
1346}
1347
1348func TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
1349	defer leakcheck.Check(t)
1350	for _, e := range listTestEnv() {
1351		if e.name == "handler-tls" {
1352			continue
1353		}
1354		testClientConnCloseAfterGoAwayWithActiveStream(t, e)
1355	}
1356}
1357
1358func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
1359	te := newTest(t, e)
1360	te.startServer(&testServer{security: e.security})
1361	defer te.tearDown()
1362	cc := te.clientConn()
1363	tc := testpb.NewTestServiceClient(cc)
1364
1365	ctx, cancel := context.WithCancel(context.Background())
1366	defer cancel()
1367	if _, err := tc.FullDuplexCall(ctx); err != nil {
1368		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
1369	}
1370	done := make(chan struct{})
1371	go func() {
1372		te.srv.GracefulStop()
1373		close(done)
1374	}()
1375	time.Sleep(50 * time.Millisecond)
1376	cc.Close()
1377	timeout := time.NewTimer(time.Second)
1378	select {
1379	case <-done:
1380	case <-timeout.C:
1381		t.Fatalf("Test timed-out.")
1382	}
1383}
1384
1385func TestFailFast(t *testing.T) {
1386	defer leakcheck.Check(t)
1387	for _, e := range listTestEnv() {
1388		testFailFast(t, e)
1389	}
1390}
1391
1392func testFailFast(t *testing.T, e env) {
1393	te := newTest(t, e)
1394	te.userAgent = testAppUA
1395	te.declareLogNoise(
1396		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1397		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1398		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1399	)
1400	te.startServer(&testServer{security: e.security})
1401	defer te.tearDown()
1402
1403	cc := te.clientConn()
1404	tc := testpb.NewTestServiceClient(cc)
1405	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1406	defer cancel()
1407	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1408		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1409	}
1410	// Stop the server and tear down all the existing connections.
1411	te.srv.Stop()
1412	// Loop until the server teardown is propagated to the client.
1413	for {
1414		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1415		_, err := tc.EmptyCall(ctx, &testpb.Empty{})
1416		cancel()
1417		if status.Code(err) == codes.Unavailable {
1418			break
1419		}
1420		t.Logf("%v.EmptyCall(_, _) = _, %v", tc, err)
1421		time.Sleep(10 * time.Millisecond)
1422	}
1423	// The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable.
1424	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable {
1425		t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable)
1426	}
1427	if _, err := tc.StreamingInputCall(context.Background()); status.Code(err) != codes.Unavailable {
1428		t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable)
1429	}
1430
1431	awaitNewConnLogOutput()
1432}
1433
1434func testServiceConfigSetup(t *testing.T, e env) *test {
1435	te := newTest(t, e)
1436	te.userAgent = testAppUA
1437	te.declareLogNoise(
1438		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1439		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1440		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1441		"Failed to dial : context canceled; please retry.",
1442	)
1443	return te
1444}
1445
1446func newBool(b bool) (a *bool) {
1447	return &b
1448}
1449
1450func newInt(b int) (a *int) {
1451	return &b
1452}
1453
1454func newDuration(b time.Duration) (a *time.Duration) {
1455	a = new(time.Duration)
1456	*a = b
1457	return
1458}
1459
1460func TestGetMethodConfig(t *testing.T) {
1461	te := testServiceConfigSetup(t, tcpClearRREnv)
1462	defer te.tearDown()
1463	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1464	defer rcleanup()
1465
1466	te.resolverScheme = r.Scheme()
1467	cc := te.clientConn()
1468	r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
1469	r.NewServiceConfig(`{
1470    "methodConfig": [
1471        {
1472            "name": [
1473                {
1474                    "service": "grpc.testing.TestService",
1475                    "method": "EmptyCall"
1476                }
1477            ],
1478            "waitForReady": true,
1479            "timeout": ".001s"
1480        },
1481        {
1482            "name": [
1483                {
1484                    "service": "grpc.testing.TestService"
1485                }
1486            ],
1487            "waitForReady": false
1488        }
1489    ]
1490}`)
1491
1492	tc := testpb.NewTestServiceClient(cc)
1493
1494	// Make sure service config has been processed by grpc.
1495	for {
1496		if cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil {
1497			break
1498		}
1499		time.Sleep(time.Millisecond)
1500	}
1501
1502	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1503	var err error
1504	if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
1505		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1506	}
1507
1508	r.NewServiceConfig(`{
1509    "methodConfig": [
1510        {
1511            "name": [
1512                {
1513                    "service": "grpc.testing.TestService",
1514                    "method": "UnaryCall"
1515                }
1516            ],
1517            "waitForReady": true,
1518            "timeout": ".001s"
1519        },
1520        {
1521            "name": [
1522                {
1523                    "service": "grpc.testing.TestService"
1524                }
1525            ],
1526            "waitForReady": false
1527        }
1528    ]
1529}`)
1530
1531	// Make sure service config has been processed by grpc.
1532	for {
1533		if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && !*mc.WaitForReady {
1534			break
1535		}
1536		time.Sleep(time.Millisecond)
1537	}
1538	// The following RPCs are expected to become fail-fast.
1539	if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable {
1540		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
1541	}
1542}
1543
1544func TestServiceConfigWaitForReady(t *testing.T) {
1545	te := testServiceConfigSetup(t, tcpClearRREnv)
1546	defer te.tearDown()
1547	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1548	defer rcleanup()
1549
1550	// Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
1551	te.resolverScheme = r.Scheme()
1552	cc := te.clientConn()
1553	r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
1554	r.NewServiceConfig(`{
1555    "methodConfig": [
1556        {
1557            "name": [
1558                {
1559                    "service": "grpc.testing.TestService",
1560                    "method": "EmptyCall"
1561                },
1562                {
1563                    "service": "grpc.testing.TestService",
1564                    "method": "FullDuplexCall"
1565                }
1566            ],
1567            "waitForReady": false,
1568            "timeout": ".001s"
1569        }
1570    ]
1571}`)
1572
1573	tc := testpb.NewTestServiceClient(cc)
1574
1575	// Make sure service config has been processed by grpc.
1576	for {
1577		if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
1578			break
1579		}
1580		time.Sleep(time.Millisecond)
1581	}
1582
1583	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1584	var err error
1585	if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
1586		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1587	}
1588	if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
1589		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1590	}
1591
1592	// Generate a service config update.
1593	// Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
1594	r.NewServiceConfig(`{
1595    "methodConfig": [
1596        {
1597            "name": [
1598                {
1599                    "service": "grpc.testing.TestService",
1600                    "method": "EmptyCall"
1601                },
1602                {
1603                    "service": "grpc.testing.TestService",
1604                    "method": "FullDuplexCall"
1605                }
1606            ],
1607            "waitForReady": true,
1608            "timeout": ".001s"
1609        }
1610    ]
1611}`)
1612
1613	// Wait for the new service config to take effect.
1614	for {
1615		if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && *mc.WaitForReady {
1616			break
1617		}
1618		time.Sleep(time.Millisecond)
1619	}
1620	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1621	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
1622		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1623	}
1624	if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded {
1625		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1626	}
1627}
1628
1629func TestServiceConfigTimeout(t *testing.T) {
1630	te := testServiceConfigSetup(t, tcpClearRREnv)
1631	defer te.tearDown()
1632	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1633	defer rcleanup()
1634
1635	// Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
1636	te.resolverScheme = r.Scheme()
1637	cc := te.clientConn()
1638	r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
1639	r.NewServiceConfig(`{
1640    "methodConfig": [
1641        {
1642            "name": [
1643                {
1644                    "service": "grpc.testing.TestService",
1645                    "method": "EmptyCall"
1646                },
1647                {
1648                    "service": "grpc.testing.TestService",
1649                    "method": "FullDuplexCall"
1650                }
1651            ],
1652            "waitForReady": true,
1653            "timeout": "3600s"
1654        }
1655    ]
1656}`)
1657
1658	tc := testpb.NewTestServiceClient(cc)
1659
1660	// Make sure service config has been processed by grpc.
1661	for {
1662		if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
1663			break
1664		}
1665		time.Sleep(time.Millisecond)
1666	}
1667
1668	// The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
1669	var err error
1670	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
1671	if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
1672		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1673	}
1674	cancel()
1675
1676	ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond)
1677	if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
1678		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1679	}
1680	cancel()
1681
1682	// Generate a service config update.
1683	// Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
1684	r.NewServiceConfig(`{
1685    "methodConfig": [
1686        {
1687            "name": [
1688                {
1689                    "service": "grpc.testing.TestService",
1690                    "method": "EmptyCall"
1691                },
1692                {
1693                    "service": "grpc.testing.TestService",
1694                    "method": "FullDuplexCall"
1695                }
1696            ],
1697            "waitForReady": true,
1698            "timeout": ".000000001s"
1699        }
1700    ]
1701}`)
1702
1703	// Wait for the new service config to take effect.
1704	for {
1705		if mc := cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall"); mc.Timeout != nil && *mc.Timeout == time.Nanosecond {
1706			break
1707		}
1708		time.Sleep(time.Millisecond)
1709	}
1710
1711	ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
1712	if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
1713		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1714	}
1715	cancel()
1716
1717	ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
1718	if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
1719		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1720	}
1721	cancel()
1722}
1723
1724func TestServiceConfigMaxMsgSize(t *testing.T) {
1725	e := tcpClearRREnv
1726	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1727	defer rcleanup()
1728
1729	// Setting up values and objects shared across all test cases.
1730	const smallSize = 1
1731	const largeSize = 1024
1732	const extraLargeSize = 2048
1733
1734	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1735	if err != nil {
1736		t.Fatal(err)
1737	}
1738	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
1739	if err != nil {
1740		t.Fatal(err)
1741	}
1742	extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
1743	if err != nil {
1744		t.Fatal(err)
1745	}
1746
1747	scjs := `{
1748    "methodConfig": [
1749        {
1750            "name": [
1751                {
1752                    "service": "grpc.testing.TestService",
1753                    "method": "UnaryCall"
1754                },
1755                {
1756                    "service": "grpc.testing.TestService",
1757                    "method": "FullDuplexCall"
1758                }
1759            ],
1760            "maxRequestMessageBytes": 2048,
1761            "maxResponseMessageBytes": 2048
1762        }
1763    ]
1764}`
1765
1766	// Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1767	te1 := testServiceConfigSetup(t, e)
1768	defer te1.tearDown()
1769
1770	te1.resolverScheme = r.Scheme()
1771	te1.nonBlockingDial = true
1772	te1.startServer(&testServer{security: e.security})
1773	cc1 := te1.clientConn()
1774
1775	r.NewAddress([]resolver.Address{{Addr: te1.srvAddr}})
1776	r.NewServiceConfig(scjs)
1777	tc := testpb.NewTestServiceClient(cc1)
1778
1779	req := &testpb.SimpleRequest{
1780		ResponseType: testpb.PayloadType_COMPRESSABLE,
1781		ResponseSize: int32(extraLargeSize),
1782		Payload:      smallPayload,
1783	}
1784
1785	for {
1786		if cc1.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1787			break
1788		}
1789		time.Sleep(time.Millisecond)
1790	}
1791
1792	// Test for unary RPC recv.
1793	if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || status.Code(err) != codes.ResourceExhausted {
1794		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1795	}
1796
1797	// Test for unary RPC send.
1798	req.Payload = extraLargePayload
1799	req.ResponseSize = int32(smallSize)
1800	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
1801		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1802	}
1803
1804	// Test for streaming RPC recv.
1805	respParam := []*testpb.ResponseParameters{
1806		{
1807			Size: int32(extraLargeSize),
1808		},
1809	}
1810	sreq := &testpb.StreamingOutputCallRequest{
1811		ResponseType:       testpb.PayloadType_COMPRESSABLE,
1812		ResponseParameters: respParam,
1813		Payload:            smallPayload,
1814	}
1815	stream, err := tc.FullDuplexCall(te1.ctx)
1816	if err != nil {
1817		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1818	}
1819	if err = stream.Send(sreq); err != nil {
1820		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1821	}
1822	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1823		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1824	}
1825
1826	// Test for streaming RPC send.
1827	respParam[0].Size = int32(smallSize)
1828	sreq.Payload = extraLargePayload
1829	stream, err = tc.FullDuplexCall(te1.ctx)
1830	if err != nil {
1831		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1832	}
1833	if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
1834		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1835	}
1836
1837	// Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1838	te2 := testServiceConfigSetup(t, e)
1839	te2.resolverScheme = r.Scheme()
1840	te2.nonBlockingDial = true
1841	te2.maxClientReceiveMsgSize = newInt(1024)
1842	te2.maxClientSendMsgSize = newInt(1024)
1843
1844	te2.startServer(&testServer{security: e.security})
1845	defer te2.tearDown()
1846	cc2 := te2.clientConn()
1847	r.NewAddress([]resolver.Address{{Addr: te2.srvAddr}})
1848	r.NewServiceConfig(scjs)
1849	tc = testpb.NewTestServiceClient(cc2)
1850
1851	for {
1852		if cc2.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1853			break
1854		}
1855		time.Sleep(time.Millisecond)
1856	}
1857
1858	// Test for unary RPC recv.
1859	req.Payload = smallPayload
1860	req.ResponseSize = int32(largeSize)
1861
1862	if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || status.Code(err) != codes.ResourceExhausted {
1863		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1864	}
1865
1866	// Test for unary RPC send.
1867	req.Payload = largePayload
1868	req.ResponseSize = int32(smallSize)
1869	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
1870		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1871	}
1872
1873	// Test for streaming RPC recv.
1874	stream, err = tc.FullDuplexCall(te2.ctx)
1875	respParam[0].Size = int32(largeSize)
1876	sreq.Payload = smallPayload
1877	if err != nil {
1878		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1879	}
1880	if err = stream.Send(sreq); err != nil {
1881		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1882	}
1883	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1884		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1885	}
1886
1887	// Test for streaming RPC send.
1888	respParam[0].Size = int32(smallSize)
1889	sreq.Payload = largePayload
1890	stream, err = tc.FullDuplexCall(te2.ctx)
1891	if err != nil {
1892		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1893	}
1894	if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
1895		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1896	}
1897
1898	// Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1899	te3 := testServiceConfigSetup(t, e)
1900	te3.resolverScheme = r.Scheme()
1901	te3.nonBlockingDial = true
1902	te3.maxClientReceiveMsgSize = newInt(4096)
1903	te3.maxClientSendMsgSize = newInt(4096)
1904
1905	te3.startServer(&testServer{security: e.security})
1906	defer te3.tearDown()
1907
1908	cc3 := te3.clientConn()
1909	r.NewAddress([]resolver.Address{{Addr: te3.srvAddr}})
1910	r.NewServiceConfig(scjs)
1911	tc = testpb.NewTestServiceClient(cc3)
1912
1913	for {
1914		if cc3.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1915			break
1916		}
1917		time.Sleep(time.Millisecond)
1918	}
1919
1920	// Test for unary RPC recv.
1921	req.Payload = smallPayload
1922	req.ResponseSize = int32(largeSize)
1923
1924	if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err != nil {
1925		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
1926	}
1927
1928	req.ResponseSize = int32(extraLargeSize)
1929	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
1930		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1931	}
1932
1933	// Test for unary RPC send.
1934	req.Payload = largePayload
1935	req.ResponseSize = int32(smallSize)
1936	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
1937		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
1938	}
1939
1940	req.Payload = extraLargePayload
1941	if _, err = tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
1942		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1943	}
1944
1945	// Test for streaming RPC recv.
1946	stream, err = tc.FullDuplexCall(te3.ctx)
1947	if err != nil {
1948		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1949	}
1950	respParam[0].Size = int32(largeSize)
1951	sreq.Payload = smallPayload
1952
1953	if err = stream.Send(sreq); err != nil {
1954		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1955	}
1956	if _, err = stream.Recv(); err != nil {
1957		t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
1958	}
1959
1960	respParam[0].Size = int32(extraLargeSize)
1961
1962	if err = stream.Send(sreq); err != nil {
1963		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1964	}
1965	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1966		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1967	}
1968
1969	// Test for streaming RPC send.
1970	respParam[0].Size = int32(smallSize)
1971	sreq.Payload = largePayload
1972	stream, err = tc.FullDuplexCall(te3.ctx)
1973	if err != nil {
1974		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1975	}
1976	if err := stream.Send(sreq); err != nil {
1977		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1978	}
1979	sreq.Payload = extraLargePayload
1980	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
1981		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1982	}
1983}
1984
1985// Reading from a streaming RPC may fail with context canceled if timeout was
1986// set by service config (https://github.com/grpc/grpc-go/issues/1818). This
1987// test makes sure read from streaming RPC doesn't fail in this case.
1988func TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
1989	te := testServiceConfigSetup(t, tcpClearRREnv)
1990	te.startServer(&testServer{security: tcpClearRREnv.security})
1991	defer te.tearDown()
1992	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1993	defer rcleanup()
1994
1995	te.resolverScheme = r.Scheme()
1996	te.nonBlockingDial = true
1997	cc := te.clientConn()
1998	tc := testpb.NewTestServiceClient(cc)
1999
2000	r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
2001	r.NewServiceConfig(`{
2002	    "methodConfig": [
2003	        {
2004	            "name": [
2005	                {
2006	                    "service": "grpc.testing.TestService",
2007	                    "method": "FullDuplexCall"
2008	                }
2009	            ],
2010	            "waitForReady": true,
2011	            "timeout": "10s"
2012	        }
2013	    ]
2014	}`)
2015	// Make sure service config has been processed by grpc.
2016	for {
2017		if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
2018			break
2019		}
2020		time.Sleep(time.Millisecond)
2021	}
2022
2023	ctx, cancel := context.WithCancel(context.Background())
2024	defer cancel()
2025	stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
2026	if err != nil {
2027		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
2028	}
2029
2030	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 0)
2031	if err != nil {
2032		t.Fatalf("failed to newPayload: %v", err)
2033	}
2034	req := &testpb.StreamingOutputCallRequest{
2035		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2036		ResponseParameters: []*testpb.ResponseParameters{{Size: 0}},
2037		Payload:            payload,
2038	}
2039	if err := stream.Send(req); err != nil {
2040		t.Fatalf("stream.Send(%v) = %v, want <nil>", req, err)
2041	}
2042	stream.CloseSend()
2043	time.Sleep(time.Second)
2044	// Sleep 1 second before recv to make sure the final status is received
2045	// before the recv.
2046	if _, err := stream.Recv(); err != nil {
2047		t.Fatalf("stream.Recv = _, %v, want _, <nil>", err)
2048	}
2049	// Keep reading to drain the stream.
2050	for {
2051		if _, err := stream.Recv(); err != nil {
2052			break
2053		}
2054	}
2055}
2056
2057func TestMaxMsgSizeClientDefault(t *testing.T) {
2058	defer leakcheck.Check(t)
2059	for _, e := range listTestEnv() {
2060		testMaxMsgSizeClientDefault(t, e)
2061	}
2062}
2063
2064func testMaxMsgSizeClientDefault(t *testing.T, e env) {
2065	te := newTest(t, e)
2066	te.userAgent = testAppUA
2067	te.declareLogNoise(
2068		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2069		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2070		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2071		"Failed to dial : context canceled; please retry.",
2072	)
2073	te.startServer(&testServer{security: e.security})
2074
2075	defer te.tearDown()
2076	tc := testpb.NewTestServiceClient(te.clientConn())
2077
2078	const smallSize = 1
2079	const largeSize = 4 * 1024 * 1024
2080	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2081	if err != nil {
2082		t.Fatal(err)
2083	}
2084	req := &testpb.SimpleRequest{
2085		ResponseType: testpb.PayloadType_COMPRESSABLE,
2086		ResponseSize: int32(largeSize),
2087		Payload:      smallPayload,
2088	}
2089	// Test for unary RPC recv.
2090	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2091		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2092	}
2093
2094	respParam := []*testpb.ResponseParameters{
2095		{
2096			Size: int32(largeSize),
2097		},
2098	}
2099	sreq := &testpb.StreamingOutputCallRequest{
2100		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2101		ResponseParameters: respParam,
2102		Payload:            smallPayload,
2103	}
2104
2105	// Test for streaming RPC recv.
2106	stream, err := tc.FullDuplexCall(te.ctx)
2107	if err != nil {
2108		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2109	}
2110	if err := stream.Send(sreq); err != nil {
2111		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2112	}
2113	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2114		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2115	}
2116}
2117
2118func TestMaxMsgSizeClientAPI(t *testing.T) {
2119	defer leakcheck.Check(t)
2120	for _, e := range listTestEnv() {
2121		testMaxMsgSizeClientAPI(t, e)
2122	}
2123}
2124
2125func testMaxMsgSizeClientAPI(t *testing.T, e env) {
2126	te := newTest(t, e)
2127	te.userAgent = testAppUA
2128	// To avoid error on server side.
2129	te.maxServerSendMsgSize = newInt(5 * 1024 * 1024)
2130	te.maxClientReceiveMsgSize = newInt(1024)
2131	te.maxClientSendMsgSize = newInt(1024)
2132	te.declareLogNoise(
2133		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2134		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2135		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2136		"Failed to dial : context canceled; please retry.",
2137	)
2138	te.startServer(&testServer{security: e.security})
2139
2140	defer te.tearDown()
2141	tc := testpb.NewTestServiceClient(te.clientConn())
2142
2143	const smallSize = 1
2144	const largeSize = 1024
2145	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2146	if err != nil {
2147		t.Fatal(err)
2148	}
2149
2150	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2151	if err != nil {
2152		t.Fatal(err)
2153	}
2154	req := &testpb.SimpleRequest{
2155		ResponseType: testpb.PayloadType_COMPRESSABLE,
2156		ResponseSize: int32(largeSize),
2157		Payload:      smallPayload,
2158	}
2159	// Test for unary RPC recv.
2160	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2161		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2162	}
2163
2164	// Test for unary RPC send.
2165	req.Payload = largePayload
2166	req.ResponseSize = int32(smallSize)
2167	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2168		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2169	}
2170
2171	respParam := []*testpb.ResponseParameters{
2172		{
2173			Size: int32(largeSize),
2174		},
2175	}
2176	sreq := &testpb.StreamingOutputCallRequest{
2177		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2178		ResponseParameters: respParam,
2179		Payload:            smallPayload,
2180	}
2181
2182	// Test for streaming RPC recv.
2183	stream, err := tc.FullDuplexCall(te.ctx)
2184	if err != nil {
2185		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2186	}
2187	if err := stream.Send(sreq); err != nil {
2188		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2189	}
2190	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2191		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2192	}
2193
2194	// Test for streaming RPC send.
2195	respParam[0].Size = int32(smallSize)
2196	sreq.Payload = largePayload
2197	stream, err = tc.FullDuplexCall(te.ctx)
2198	if err != nil {
2199		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2200	}
2201	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
2202		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
2203	}
2204}
2205
2206func TestMaxMsgSizeServerAPI(t *testing.T) {
2207	defer leakcheck.Check(t)
2208	for _, e := range listTestEnv() {
2209		testMaxMsgSizeServerAPI(t, e)
2210	}
2211}
2212
2213func testMaxMsgSizeServerAPI(t *testing.T, e env) {
2214	te := newTest(t, e)
2215	te.userAgent = testAppUA
2216	te.maxServerReceiveMsgSize = newInt(1024)
2217	te.maxServerSendMsgSize = newInt(1024)
2218	te.declareLogNoise(
2219		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2220		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2221		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2222		"Failed to dial : context canceled; please retry.",
2223	)
2224	te.startServer(&testServer{security: e.security})
2225
2226	defer te.tearDown()
2227	tc := testpb.NewTestServiceClient(te.clientConn())
2228
2229	const smallSize = 1
2230	const largeSize = 1024
2231	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2232	if err != nil {
2233		t.Fatal(err)
2234	}
2235
2236	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2237	if err != nil {
2238		t.Fatal(err)
2239	}
2240	req := &testpb.SimpleRequest{
2241		ResponseType: testpb.PayloadType_COMPRESSABLE,
2242		ResponseSize: int32(largeSize),
2243		Payload:      smallPayload,
2244	}
2245	// Test for unary RPC send.
2246	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2247		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2248	}
2249
2250	// Test for unary RPC recv.
2251	req.Payload = largePayload
2252	req.ResponseSize = int32(smallSize)
2253	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2254		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2255	}
2256
2257	respParam := []*testpb.ResponseParameters{
2258		{
2259			Size: int32(largeSize),
2260		},
2261	}
2262	sreq := &testpb.StreamingOutputCallRequest{
2263		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2264		ResponseParameters: respParam,
2265		Payload:            smallPayload,
2266	}
2267
2268	// Test for streaming RPC send.
2269	stream, err := tc.FullDuplexCall(te.ctx)
2270	if err != nil {
2271		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2272	}
2273	if err := stream.Send(sreq); err != nil {
2274		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2275	}
2276	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2277		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2278	}
2279
2280	// Test for streaming RPC recv.
2281	respParam[0].Size = int32(smallSize)
2282	sreq.Payload = largePayload
2283	stream, err = tc.FullDuplexCall(te.ctx)
2284	if err != nil {
2285		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2286	}
2287	if err := stream.Send(sreq); err != nil {
2288		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2289	}
2290	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2291		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2292	}
2293}
2294
2295func TestTap(t *testing.T) {
2296	defer leakcheck.Check(t)
2297	for _, e := range listTestEnv() {
2298		if e.name == "handler-tls" {
2299			continue
2300		}
2301		testTap(t, e)
2302	}
2303}
2304
2305type myTap struct {
2306	cnt int
2307}
2308
2309func (t *myTap) handle(ctx context.Context, info *tap.Info) (context.Context, error) {
2310	if info != nil {
2311		if info.FullMethodName == "/grpc.testing.TestService/EmptyCall" {
2312			t.cnt++
2313		} else if info.FullMethodName == "/grpc.testing.TestService/UnaryCall" {
2314			return nil, fmt.Errorf("tap error")
2315		}
2316	}
2317	return ctx, nil
2318}
2319
2320func testTap(t *testing.T, e env) {
2321	te := newTest(t, e)
2322	te.userAgent = testAppUA
2323	ttap := &myTap{}
2324	te.tapHandle = ttap.handle
2325	te.declareLogNoise(
2326		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2327		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2328		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2329	)
2330	te.startServer(&testServer{security: e.security})
2331	defer te.tearDown()
2332
2333	cc := te.clientConn()
2334	tc := testpb.NewTestServiceClient(cc)
2335	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
2336		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2337	}
2338	if ttap.cnt != 1 {
2339		t.Fatalf("Get the count in ttap %d, want 1", ttap.cnt)
2340	}
2341
2342	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 31)
2343	if err != nil {
2344		t.Fatal(err)
2345	}
2346
2347	req := &testpb.SimpleRequest{
2348		ResponseType: testpb.PayloadType_COMPRESSABLE,
2349		ResponseSize: 45,
2350		Payload:      payload,
2351	}
2352	if _, err := tc.UnaryCall(context.Background(), req); status.Code(err) != codes.Unavailable {
2353		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
2354	}
2355}
2356
2357func healthCheck(d time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) {
2358	ctx, cancel := context.WithTimeout(context.Background(), d)
2359	defer cancel()
2360	hc := healthgrpc.NewHealthClient(cc)
2361	req := &healthpb.HealthCheckRequest{
2362		Service: serviceName,
2363	}
2364	return hc.Check(ctx, req)
2365}
2366
2367func TestHealthCheckOnSuccess(t *testing.T) {
2368	defer leakcheck.Check(t)
2369	for _, e := range listTestEnv() {
2370		testHealthCheckOnSuccess(t, e)
2371	}
2372}
2373
2374func testHealthCheckOnSuccess(t *testing.T, e env) {
2375	te := newTest(t, e)
2376	hs := health.NewServer()
2377	hs.SetServingStatus("grpc.health.v1.Health", 1)
2378	te.healthServer = hs
2379	te.startServer(&testServer{security: e.security})
2380	defer te.tearDown()
2381
2382	cc := te.clientConn()
2383	if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); err != nil {
2384		t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2385	}
2386}
2387
2388func TestHealthCheckOnFailure(t *testing.T) {
2389	defer leakcheck.Check(t)
2390	for _, e := range listTestEnv() {
2391		testHealthCheckOnFailure(t, e)
2392	}
2393}
2394
2395func testHealthCheckOnFailure(t *testing.T, e env) {
2396	defer leakcheck.Check(t)
2397	te := newTest(t, e)
2398	te.declareLogNoise(
2399		"Failed to dial ",
2400		"grpc: the client connection is closing; please retry",
2401	)
2402	hs := health.NewServer()
2403	hs.SetServingStatus("grpc.health.v1.HealthCheck", 1)
2404	te.healthServer = hs
2405	te.startServer(&testServer{security: e.security})
2406	defer te.tearDown()
2407
2408	cc := te.clientConn()
2409	wantErr := status.Error(codes.DeadlineExceeded, "context deadline exceeded")
2410	if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1.Health"); !reflect.DeepEqual(err, wantErr) {
2411		t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.DeadlineExceeded)
2412	}
2413	awaitNewConnLogOutput()
2414}
2415
2416func TestHealthCheckOff(t *testing.T) {
2417	defer leakcheck.Check(t)
2418	for _, e := range listTestEnv() {
2419		// TODO(bradfitz): Temporarily skip this env due to #619.
2420		if e.name == "handler-tls" {
2421			continue
2422		}
2423		testHealthCheckOff(t, e)
2424	}
2425}
2426
2427func testHealthCheckOff(t *testing.T, e env) {
2428	te := newTest(t, e)
2429	te.startServer(&testServer{security: e.security})
2430	defer te.tearDown()
2431	want := status.Error(codes.Unimplemented, "unknown service grpc.health.v1.Health")
2432	if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !reflect.DeepEqual(err, want) {
2433		t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want)
2434	}
2435}
2436
2437func TestHealthWatchMultipleClients(t *testing.T) {
2438	defer leakcheck.Check(t)
2439	for _, e := range listTestEnv() {
2440		testHealthWatchMultipleClients(t, e)
2441	}
2442}
2443
2444func testHealthWatchMultipleClients(t *testing.T, e env) {
2445	const service = "grpc.health.v1.Health1"
2446
2447	hs := health.NewServer()
2448
2449	te := newTest(t, e)
2450	te.healthServer = hs
2451	te.startServer(&testServer{security: e.security})
2452	defer te.tearDown()
2453
2454	cc := te.clientConn()
2455	hc := healthgrpc.NewHealthClient(cc)
2456
2457	ctx, cancel := context.WithCancel(context.Background())
2458	defer cancel()
2459
2460	req := &healthpb.HealthCheckRequest{
2461		Service: service,
2462	}
2463
2464	stream1, err := hc.Watch(ctx, req)
2465	if err != nil {
2466		t.Fatalf("error: %v", err)
2467	}
2468
2469	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
2470
2471	stream2, err := hc.Watch(ctx, req)
2472	if err != nil {
2473		t.Fatalf("error: %v", err)
2474	}
2475
2476	healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
2477
2478	hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
2479
2480	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING)
2481	healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING)
2482}
2483
2484func TestHealthWatchSameStatus(t *testing.T) {
2485	defer leakcheck.Check(t)
2486	for _, e := range listTestEnv() {
2487		testHealthWatchSameStatus(t, e)
2488	}
2489}
2490
2491func testHealthWatchSameStatus(t *testing.T, e env) {
2492	const service = "grpc.health.v1.Health1"
2493
2494	hs := health.NewServer()
2495
2496	te := newTest(t, e)
2497	te.healthServer = hs
2498	te.startServer(&testServer{security: e.security})
2499	defer te.tearDown()
2500
2501	cc := te.clientConn()
2502	hc := healthgrpc.NewHealthClient(cc)
2503
2504	ctx, cancel := context.WithCancel(context.Background())
2505	defer cancel()
2506
2507	req := &healthpb.HealthCheckRequest{
2508		Service: service,
2509	}
2510
2511	stream1, err := hc.Watch(ctx, req)
2512	if err != nil {
2513		t.Fatalf("error: %v", err)
2514	}
2515
2516	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
2517
2518	hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
2519
2520	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVING)
2521
2522	hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
2523	hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
2524
2525	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING)
2526}
2527
2528func TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) {
2529	defer leakcheck.Check(t)
2530	for _, e := range listTestEnv() {
2531		testHealthWatchSetServiceStatusBeforeStartingServer(t, e)
2532	}
2533}
2534
2535func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) {
2536	const service = "grpc.health.v1.Health1"
2537
2538	hs := health.NewServer()
2539
2540	te := newTest(t, e)
2541	te.healthServer = hs
2542
2543	hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
2544
2545	te.startServer(&testServer{security: e.security})
2546	defer te.tearDown()
2547
2548	cc := te.clientConn()
2549	hc := healthgrpc.NewHealthClient(cc)
2550
2551	ctx, cancel := context.WithCancel(context.Background())
2552	defer cancel()
2553
2554	req := &healthpb.HealthCheckRequest{
2555		Service: service,
2556	}
2557
2558	stream, err := hc.Watch(ctx, req)
2559	if err != nil {
2560		t.Fatalf("error: %v", err)
2561	}
2562
2563	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2564}
2565
2566func TestHealthWatchDefaultStatusChange(t *testing.T) {
2567	defer leakcheck.Check(t)
2568	for _, e := range listTestEnv() {
2569		testHealthWatchDefaultStatusChange(t, e)
2570	}
2571}
2572
2573func testHealthWatchDefaultStatusChange(t *testing.T, e env) {
2574	const service = "grpc.health.v1.Health1"
2575
2576	hs := health.NewServer()
2577
2578	te := newTest(t, e)
2579	te.healthServer = hs
2580	te.startServer(&testServer{security: e.security})
2581	defer te.tearDown()
2582
2583	cc := te.clientConn()
2584	hc := healthgrpc.NewHealthClient(cc)
2585
2586	ctx, cancel := context.WithCancel(context.Background())
2587	defer cancel()
2588
2589	req := &healthpb.HealthCheckRequest{
2590		Service: service,
2591	}
2592
2593	stream, err := hc.Watch(ctx, req)
2594	if err != nil {
2595		t.Fatalf("error: %v", err)
2596	}
2597
2598	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
2599
2600	hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
2601
2602	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2603}
2604
2605func TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) {
2606	defer leakcheck.Check(t)
2607	for _, e := range listTestEnv() {
2608		testHealthWatchSetServiceStatusBeforeClientCallsWatch(t, e)
2609	}
2610}
2611
2612func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) {
2613	const service = "grpc.health.v1.Health1"
2614
2615	hs := health.NewServer()
2616
2617	te := newTest(t, e)
2618	te.healthServer = hs
2619	te.startServer(&testServer{security: e.security})
2620	defer te.tearDown()
2621
2622	cc := te.clientConn()
2623	hc := healthgrpc.NewHealthClient(cc)
2624
2625	ctx, cancel := context.WithCancel(context.Background())
2626	defer cancel()
2627
2628	req := &healthpb.HealthCheckRequest{
2629		Service: service,
2630	}
2631
2632	hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
2633
2634	stream, err := hc.Watch(ctx, req)
2635	if err != nil {
2636		t.Fatalf("error: %v", err)
2637	}
2638
2639	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2640}
2641
2642func TestHealthWatchOverallServerHealthChange(t *testing.T) {
2643	defer leakcheck.Check(t)
2644	for _, e := range listTestEnv() {
2645		testHealthWatchOverallServerHealthChange(t, e)
2646	}
2647}
2648
2649func testHealthWatchOverallServerHealthChange(t *testing.T, e env) {
2650	const service = ""
2651
2652	hs := health.NewServer()
2653
2654	te := newTest(t, e)
2655	te.healthServer = hs
2656	te.startServer(&testServer{security: e.security})
2657	defer te.tearDown()
2658
2659	cc := te.clientConn()
2660	hc := healthgrpc.NewHealthClient(cc)
2661
2662	ctx, cancel := context.WithCancel(context.Background())
2663	defer cancel()
2664
2665	req := &healthpb.HealthCheckRequest{
2666		Service: service,
2667	}
2668
2669	stream, err := hc.Watch(ctx, req)
2670	if err != nil {
2671		t.Fatalf("error: %v", err)
2672	}
2673
2674	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2675
2676	hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
2677
2678	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING)
2679}
2680
2681func healthWatchChecker(t *testing.T, stream healthgrpc.Health_WatchClient, expectedServingStatus healthpb.HealthCheckResponse_ServingStatus) {
2682	response, err := stream.Recv()
2683	if err != nil {
2684		t.Fatalf("error on %v.Recv(): %v", stream, err)
2685	}
2686	if response.Status != expectedServingStatus {
2687		t.Fatalf("response.Status is %v (%v expected)", response.Status, expectedServingStatus)
2688	}
2689}
2690
2691func TestUnknownHandler(t *testing.T) {
2692	defer leakcheck.Check(t)
2693	// An example unknownHandler that returns a different code and a different method, making sure that we do not
2694	// expose what methods are implemented to a client that is not authenticated.
2695	unknownHandler := func(srv interface{}, stream grpc.ServerStream) error {
2696		return status.Error(codes.Unauthenticated, "user unauthenticated")
2697	}
2698	for _, e := range listTestEnv() {
2699		// TODO(bradfitz): Temporarily skip this env due to #619.
2700		if e.name == "handler-tls" {
2701			continue
2702		}
2703		testUnknownHandler(t, e, unknownHandler)
2704	}
2705}
2706
2707func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) {
2708	te := newTest(t, e)
2709	te.unknownHandler = unknownHandler
2710	te.startServer(&testServer{security: e.security})
2711	defer te.tearDown()
2712	want := status.Error(codes.Unauthenticated, "user unauthenticated")
2713	if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !reflect.DeepEqual(err, want) {
2714		t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want)
2715	}
2716}
2717
2718func TestHealthCheckServingStatus(t *testing.T) {
2719	defer leakcheck.Check(t)
2720	for _, e := range listTestEnv() {
2721		testHealthCheckServingStatus(t, e)
2722	}
2723}
2724
2725func testHealthCheckServingStatus(t *testing.T, e env) {
2726	te := newTest(t, e)
2727	hs := health.NewServer()
2728	te.healthServer = hs
2729	te.startServer(&testServer{security: e.security})
2730	defer te.tearDown()
2731
2732	cc := te.clientConn()
2733	out, err := healthCheck(1*time.Second, cc, "")
2734	if err != nil {
2735		t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2736	}
2737	if out.Status != healthpb.HealthCheckResponse_SERVING {
2738		t.Fatalf("Got the serving status %v, want SERVING", out.Status)
2739	}
2740	wantErr := status.Error(codes.NotFound, "unknown service")
2741	if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); !reflect.DeepEqual(err, wantErr) {
2742		t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.NotFound)
2743	}
2744	hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_SERVING)
2745	out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health")
2746	if err != nil {
2747		t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2748	}
2749	if out.Status != healthpb.HealthCheckResponse_SERVING {
2750		t.Fatalf("Got the serving status %v, want SERVING", out.Status)
2751	}
2752	hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_NOT_SERVING)
2753	out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health")
2754	if err != nil {
2755		t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2756	}
2757	if out.Status != healthpb.HealthCheckResponse_NOT_SERVING {
2758		t.Fatalf("Got the serving status %v, want NOT_SERVING", out.Status)
2759	}
2760
2761}
2762
2763func TestEmptyUnaryWithUserAgent(t *testing.T) {
2764	defer leakcheck.Check(t)
2765	for _, e := range listTestEnv() {
2766		testEmptyUnaryWithUserAgent(t, e)
2767	}
2768}
2769
2770func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
2771	te := newTest(t, e)
2772	te.userAgent = testAppUA
2773	te.startServer(&testServer{security: e.security})
2774	defer te.tearDown()
2775
2776	cc := te.clientConn()
2777	tc := testpb.NewTestServiceClient(cc)
2778	var header metadata.MD
2779	reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header))
2780	if err != nil || !proto.Equal(&testpb.Empty{}, reply) {
2781		t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{})
2782	}
2783	if v, ok := header["ua"]; !ok || !strings.HasPrefix(v[0], testAppUA) {
2784		t.Fatalf("header[\"ua\"] = %q, %t, want string with prefix %q, true", v, ok, testAppUA)
2785	}
2786
2787	te.srv.Stop()
2788}
2789
2790func TestFailedEmptyUnary(t *testing.T) {
2791	defer leakcheck.Check(t)
2792	for _, e := range listTestEnv() {
2793		if e.name == "handler-tls" {
2794			// This test covers status details, but
2795			// Grpc-Status-Details-Bin is not support in handler_server.
2796			continue
2797		}
2798		testFailedEmptyUnary(t, e)
2799	}
2800}
2801
2802func testFailedEmptyUnary(t *testing.T, e env) {
2803	te := newTest(t, e)
2804	te.userAgent = failAppUA
2805	te.startServer(&testServer{security: e.security})
2806	defer te.tearDown()
2807	tc := testpb.NewTestServiceClient(te.clientConn())
2808
2809	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2810	wantErr := detailedError
2811	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !reflect.DeepEqual(err, wantErr) {
2812		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr)
2813	}
2814}
2815
2816func TestLargeUnary(t *testing.T) {
2817	defer leakcheck.Check(t)
2818	for _, e := range listTestEnv() {
2819		testLargeUnary(t, e)
2820	}
2821}
2822
2823func testLargeUnary(t *testing.T, e env) {
2824	te := newTest(t, e)
2825	te.startServer(&testServer{security: e.security})
2826	defer te.tearDown()
2827	tc := testpb.NewTestServiceClient(te.clientConn())
2828
2829	const argSize = 271828
2830	const respSize = 314159
2831
2832	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2833	if err != nil {
2834		t.Fatal(err)
2835	}
2836
2837	req := &testpb.SimpleRequest{
2838		ResponseType: testpb.PayloadType_COMPRESSABLE,
2839		ResponseSize: respSize,
2840		Payload:      payload,
2841	}
2842	reply, err := tc.UnaryCall(context.Background(), req)
2843	if err != nil {
2844		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
2845	}
2846	pt := reply.GetPayload().GetType()
2847	ps := len(reply.GetPayload().GetBody())
2848	if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize {
2849		t.Fatalf("Got the reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize)
2850	}
2851}
2852
2853// Test backward-compatibility API for setting msg size limit.
2854func TestExceedMsgLimit(t *testing.T) {
2855	defer leakcheck.Check(t)
2856	for _, e := range listTestEnv() {
2857		testExceedMsgLimit(t, e)
2858	}
2859}
2860
2861func testExceedMsgLimit(t *testing.T, e env) {
2862	te := newTest(t, e)
2863	te.maxMsgSize = newInt(1024)
2864	te.startServer(&testServer{security: e.security})
2865	defer te.tearDown()
2866	tc := testpb.NewTestServiceClient(te.clientConn())
2867
2868	argSize := int32(*te.maxMsgSize + 1)
2869	const smallSize = 1
2870
2871	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2872	if err != nil {
2873		t.Fatal(err)
2874	}
2875	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2876	if err != nil {
2877		t.Fatal(err)
2878	}
2879
2880	// Test on server side for unary RPC.
2881	req := &testpb.SimpleRequest{
2882		ResponseType: testpb.PayloadType_COMPRESSABLE,
2883		ResponseSize: smallSize,
2884		Payload:      payload,
2885	}
2886	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2887		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2888	}
2889	// Test on client side for unary RPC.
2890	req.ResponseSize = int32(*te.maxMsgSize) + 1
2891	req.Payload = smallPayload
2892	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2893		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2894	}
2895
2896	// Test on server side for streaming RPC.
2897	stream, err := tc.FullDuplexCall(te.ctx)
2898	if err != nil {
2899		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2900	}
2901	respParam := []*testpb.ResponseParameters{
2902		{
2903			Size: 1,
2904		},
2905	}
2906
2907	spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(*te.maxMsgSize+1))
2908	if err != nil {
2909		t.Fatal(err)
2910	}
2911
2912	sreq := &testpb.StreamingOutputCallRequest{
2913		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2914		ResponseParameters: respParam,
2915		Payload:            spayload,
2916	}
2917	if err := stream.Send(sreq); err != nil {
2918		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2919	}
2920	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2921		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2922	}
2923
2924	// Test on client side for streaming RPC.
2925	stream, err = tc.FullDuplexCall(te.ctx)
2926	if err != nil {
2927		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2928	}
2929	respParam[0].Size = int32(*te.maxMsgSize) + 1
2930	sreq.Payload = smallPayload
2931	if err := stream.Send(sreq); err != nil {
2932		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2933	}
2934	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2935		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2936	}
2937
2938}
2939
2940func TestPeerClientSide(t *testing.T) {
2941	defer leakcheck.Check(t)
2942	for _, e := range listTestEnv() {
2943		testPeerClientSide(t, e)
2944	}
2945}
2946
2947func testPeerClientSide(t *testing.T, e env) {
2948	te := newTest(t, e)
2949	te.userAgent = testAppUA
2950	te.startServer(&testServer{security: e.security})
2951	defer te.tearDown()
2952	tc := testpb.NewTestServiceClient(te.clientConn())
2953	peer := new(peer.Peer)
2954	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.FailFast(false)); err != nil {
2955		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2956	}
2957	pa := peer.Addr.String()
2958	if e.network == "unix" {
2959		if pa != te.srvAddr {
2960			t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
2961		}
2962		return
2963	}
2964	_, pp, err := net.SplitHostPort(pa)
2965	if err != nil {
2966		t.Fatalf("Failed to parse address from peer.")
2967	}
2968	_, sp, err := net.SplitHostPort(te.srvAddr)
2969	if err != nil {
2970		t.Fatalf("Failed to parse address of test server.")
2971	}
2972	if pp != sp {
2973		t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
2974	}
2975}
2976
2977// TestPeerNegative tests that if call fails setting peer
2978// doesn't cause a segmentation fault.
2979// issue#1141 https://github.com/grpc/grpc-go/issues/1141
2980func TestPeerNegative(t *testing.T) {
2981	defer leakcheck.Check(t)
2982	for _, e := range listTestEnv() {
2983		testPeerNegative(t, e)
2984	}
2985}
2986
2987func testPeerNegative(t *testing.T, e env) {
2988	te := newTest(t, e)
2989	te.startServer(&testServer{security: e.security})
2990	defer te.tearDown()
2991
2992	cc := te.clientConn()
2993	tc := testpb.NewTestServiceClient(cc)
2994	peer := new(peer.Peer)
2995	ctx, cancel := context.WithCancel(context.Background())
2996	cancel()
2997	tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer))
2998}
2999
3000func TestPeerFailedRPC(t *testing.T) {
3001	defer leakcheck.Check(t)
3002	for _, e := range listTestEnv() {
3003		testPeerFailedRPC(t, e)
3004	}
3005}
3006
3007func testPeerFailedRPC(t *testing.T, e env) {
3008	te := newTest(t, e)
3009	te.maxServerReceiveMsgSize = newInt(1 * 1024)
3010	te.startServer(&testServer{security: e.security})
3011
3012	defer te.tearDown()
3013	tc := testpb.NewTestServiceClient(te.clientConn())
3014
3015	// first make a successful request to the server
3016	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
3017		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
3018	}
3019
3020	// make a second request that will be rejected by the server
3021	const largeSize = 5 * 1024
3022	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
3023	if err != nil {
3024		t.Fatal(err)
3025	}
3026	req := &testpb.SimpleRequest{
3027		ResponseType: testpb.PayloadType_COMPRESSABLE,
3028		Payload:      largePayload,
3029	}
3030
3031	peer := new(peer.Peer)
3032	if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || status.Code(err) != codes.ResourceExhausted {
3033		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
3034	} else {
3035		pa := peer.Addr.String()
3036		if e.network == "unix" {
3037			if pa != te.srvAddr {
3038				t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
3039			}
3040			return
3041		}
3042		_, pp, err := net.SplitHostPort(pa)
3043		if err != nil {
3044			t.Fatalf("Failed to parse address from peer.")
3045		}
3046		_, sp, err := net.SplitHostPort(te.srvAddr)
3047		if err != nil {
3048			t.Fatalf("Failed to parse address of test server.")
3049		}
3050		if pp != sp {
3051			t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
3052		}
3053	}
3054}
3055
3056func TestMetadataUnaryRPC(t *testing.T) {
3057	defer leakcheck.Check(t)
3058	for _, e := range listTestEnv() {
3059		testMetadataUnaryRPC(t, e)
3060	}
3061}
3062
3063func testMetadataUnaryRPC(t *testing.T, e env) {
3064	te := newTest(t, e)
3065	te.startServer(&testServer{security: e.security})
3066	defer te.tearDown()
3067	tc := testpb.NewTestServiceClient(te.clientConn())
3068
3069	const argSize = 2718
3070	const respSize = 314
3071
3072	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3073	if err != nil {
3074		t.Fatal(err)
3075	}
3076
3077	req := &testpb.SimpleRequest{
3078		ResponseType: testpb.PayloadType_COMPRESSABLE,
3079		ResponseSize: respSize,
3080		Payload:      payload,
3081	}
3082	var header, trailer metadata.MD
3083	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3084	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.Trailer(&trailer)); err != nil {
3085		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
3086	}
3087	// Ignore optional response headers that Servers may set:
3088	if header != nil {
3089		delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers
3090		delete(header, "date")    // the Date header is also optional
3091		delete(header, "user-agent")
3092		delete(header, "content-type")
3093	}
3094	if !reflect.DeepEqual(header, testMetadata) {
3095		t.Fatalf("Received header metadata %v, want %v", header, testMetadata)
3096	}
3097	if !reflect.DeepEqual(trailer, testTrailerMetadata) {
3098		t.Fatalf("Received trailer metadata %v, want %v", trailer, testTrailerMetadata)
3099	}
3100}
3101
3102func TestMultipleSetTrailerUnaryRPC(t *testing.T) {
3103	defer leakcheck.Check(t)
3104	for _, e := range listTestEnv() {
3105		testMultipleSetTrailerUnaryRPC(t, e)
3106	}
3107}
3108
3109func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) {
3110	te := newTest(t, e)
3111	te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
3112	defer te.tearDown()
3113	tc := testpb.NewTestServiceClient(te.clientConn())
3114
3115	const (
3116		argSize  = 1
3117		respSize = 1
3118	)
3119	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3120	if err != nil {
3121		t.Fatal(err)
3122	}
3123
3124	req := &testpb.SimpleRequest{
3125		ResponseType: testpb.PayloadType_COMPRESSABLE,
3126		ResponseSize: respSize,
3127		Payload:      payload,
3128	}
3129	var trailer metadata.MD
3130	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3131	if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.FailFast(false)); err != nil {
3132		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
3133	}
3134	expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
3135	if !reflect.DeepEqual(trailer, expectedTrailer) {
3136		t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
3137	}
3138}
3139
3140func TestMultipleSetTrailerStreamingRPC(t *testing.T) {
3141	defer leakcheck.Check(t)
3142	for _, e := range listTestEnv() {
3143		testMultipleSetTrailerStreamingRPC(t, e)
3144	}
3145}
3146
3147func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) {
3148	te := newTest(t, e)
3149	te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
3150	defer te.tearDown()
3151	tc := testpb.NewTestServiceClient(te.clientConn())
3152
3153	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3154	stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
3155	if err != nil {
3156		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3157	}
3158	if err := stream.CloseSend(); err != nil {
3159		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3160	}
3161	if _, err := stream.Recv(); err != io.EOF {
3162		t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
3163	}
3164
3165	trailer := stream.Trailer()
3166	expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
3167	if !reflect.DeepEqual(trailer, expectedTrailer) {
3168		t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
3169	}
3170}
3171
3172func TestSetAndSendHeaderUnaryRPC(t *testing.T) {
3173	defer leakcheck.Check(t)
3174	for _, e := range listTestEnv() {
3175		if e.name == "handler-tls" {
3176			continue
3177		}
3178		testSetAndSendHeaderUnaryRPC(t, e)
3179	}
3180}
3181
3182// To test header metadata is sent on SendHeader().
3183func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) {
3184	te := newTest(t, e)
3185	te.startServer(&testServer{security: e.security, setAndSendHeader: true})
3186	defer te.tearDown()
3187	tc := testpb.NewTestServiceClient(te.clientConn())
3188
3189	const (
3190		argSize  = 1
3191		respSize = 1
3192	)
3193	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3194	if err != nil {
3195		t.Fatal(err)
3196	}
3197
3198	req := &testpb.SimpleRequest{
3199		ResponseType: testpb.PayloadType_COMPRESSABLE,
3200		ResponseSize: respSize,
3201		Payload:      payload,
3202	}
3203	var header metadata.MD
3204	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3205	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil {
3206		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
3207	}
3208	delete(header, "user-agent")
3209	delete(header, "content-type")
3210	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3211	if !reflect.DeepEqual(header, expectedHeader) {
3212		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3213	}
3214}
3215
3216func TestMultipleSetHeaderUnaryRPC(t *testing.T) {
3217	defer leakcheck.Check(t)
3218	for _, e := range listTestEnv() {
3219		if e.name == "handler-tls" {
3220			continue
3221		}
3222		testMultipleSetHeaderUnaryRPC(t, e)
3223	}
3224}
3225
3226// To test header metadata is sent when sending response.
3227func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) {
3228	te := newTest(t, e)
3229	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
3230	defer te.tearDown()
3231	tc := testpb.NewTestServiceClient(te.clientConn())
3232
3233	const (
3234		argSize  = 1
3235		respSize = 1
3236	)
3237	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3238	if err != nil {
3239		t.Fatal(err)
3240	}
3241
3242	req := &testpb.SimpleRequest{
3243		ResponseType: testpb.PayloadType_COMPRESSABLE,
3244		ResponseSize: respSize,
3245		Payload:      payload,
3246	}
3247
3248	var header metadata.MD
3249	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3250	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err != nil {
3251		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
3252	}
3253	delete(header, "user-agent")
3254	delete(header, "content-type")
3255	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3256	if !reflect.DeepEqual(header, expectedHeader) {
3257		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3258	}
3259}
3260
3261func TestMultipleSetHeaderUnaryRPCError(t *testing.T) {
3262	defer leakcheck.Check(t)
3263	for _, e := range listTestEnv() {
3264		if e.name == "handler-tls" {
3265			continue
3266		}
3267		testMultipleSetHeaderUnaryRPCError(t, e)
3268	}
3269}
3270
3271// To test header metadata is sent when sending status.
3272func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) {
3273	te := newTest(t, e)
3274	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
3275	defer te.tearDown()
3276	tc := testpb.NewTestServiceClient(te.clientConn())
3277
3278	const (
3279		argSize  = 1
3280		respSize = -1 // Invalid respSize to make RPC fail.
3281	)
3282	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3283	if err != nil {
3284		t.Fatal(err)
3285	}
3286
3287	req := &testpb.SimpleRequest{
3288		ResponseType: testpb.PayloadType_COMPRESSABLE,
3289		ResponseSize: respSize,
3290		Payload:      payload,
3291	}
3292	var header metadata.MD
3293	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3294	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.FailFast(false)); err == nil {
3295		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err)
3296	}
3297	delete(header, "user-agent")
3298	delete(header, "content-type")
3299	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3300	if !reflect.DeepEqual(header, expectedHeader) {
3301		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3302	}
3303}
3304
3305func TestSetAndSendHeaderStreamingRPC(t *testing.T) {
3306	defer leakcheck.Check(t)
3307	for _, e := range listTestEnv() {
3308		if e.name == "handler-tls" {
3309			continue
3310		}
3311		testSetAndSendHeaderStreamingRPC(t, e)
3312	}
3313}
3314
3315// To test header metadata is sent on SendHeader().
3316func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) {
3317	te := newTest(t, e)
3318	te.startServer(&testServer{security: e.security, setAndSendHeader: true})
3319	defer te.tearDown()
3320	tc := testpb.NewTestServiceClient(te.clientConn())
3321
3322	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3323	stream, err := tc.FullDuplexCall(ctx)
3324	if err != nil {
3325		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3326	}
3327	if err := stream.CloseSend(); err != nil {
3328		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3329	}
3330	if _, err := stream.Recv(); err != io.EOF {
3331		t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
3332	}
3333
3334	header, err := stream.Header()
3335	if err != nil {
3336		t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
3337	}
3338	delete(header, "user-agent")
3339	delete(header, "content-type")
3340	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3341	if !reflect.DeepEqual(header, expectedHeader) {
3342		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3343	}
3344}
3345
3346func TestMultipleSetHeaderStreamingRPC(t *testing.T) {
3347	defer leakcheck.Check(t)
3348	for _, e := range listTestEnv() {
3349		if e.name == "handler-tls" {
3350			continue
3351		}
3352		testMultipleSetHeaderStreamingRPC(t, e)
3353	}
3354}
3355
3356// To test header metadata is sent when sending response.
3357func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) {
3358	te := newTest(t, e)
3359	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
3360	defer te.tearDown()
3361	tc := testpb.NewTestServiceClient(te.clientConn())
3362
3363	const (
3364		argSize  = 1
3365		respSize = 1
3366	)
3367	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3368	stream, err := tc.FullDuplexCall(ctx)
3369	if err != nil {
3370		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3371	}
3372
3373	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3374	if err != nil {
3375		t.Fatal(err)
3376	}
3377
3378	req := &testpb.StreamingOutputCallRequest{
3379		ResponseType: testpb.PayloadType_COMPRESSABLE,
3380		ResponseParameters: []*testpb.ResponseParameters{
3381			{Size: respSize},
3382		},
3383		Payload: payload,
3384	}
3385	if err := stream.Send(req); err != nil {
3386		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3387	}
3388	if _, err := stream.Recv(); err != nil {
3389		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
3390	}
3391	if err := stream.CloseSend(); err != nil {
3392		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3393	}
3394	if _, err := stream.Recv(); err != io.EOF {
3395		t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
3396	}
3397
3398	header, err := stream.Header()
3399	if err != nil {
3400		t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
3401	}
3402	delete(header, "user-agent")
3403	delete(header, "content-type")
3404	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3405	if !reflect.DeepEqual(header, expectedHeader) {
3406		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3407	}
3408
3409}
3410
3411func TestMultipleSetHeaderStreamingRPCError(t *testing.T) {
3412	defer leakcheck.Check(t)
3413	for _, e := range listTestEnv() {
3414		if e.name == "handler-tls" {
3415			continue
3416		}
3417		testMultipleSetHeaderStreamingRPCError(t, e)
3418	}
3419}
3420
3421// To test header metadata is sent when sending status.
3422func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
3423	te := newTest(t, e)
3424	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
3425	defer te.tearDown()
3426	tc := testpb.NewTestServiceClient(te.clientConn())
3427
3428	const (
3429		argSize  = 1
3430		respSize = -1
3431	)
3432	ctx, cancel := context.WithCancel(context.Background())
3433	defer cancel()
3434	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
3435	stream, err := tc.FullDuplexCall(ctx)
3436	if err != nil {
3437		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3438	}
3439
3440	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3441	if err != nil {
3442		t.Fatal(err)
3443	}
3444
3445	req := &testpb.StreamingOutputCallRequest{
3446		ResponseType: testpb.PayloadType_COMPRESSABLE,
3447		ResponseParameters: []*testpb.ResponseParameters{
3448			{Size: respSize},
3449		},
3450		Payload: payload,
3451	}
3452	if err := stream.Send(req); err != nil {
3453		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3454	}
3455	if _, err := stream.Recv(); err == nil {
3456		t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
3457	}
3458
3459	header, err := stream.Header()
3460	if err != nil {
3461		t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
3462	}
3463	delete(header, "user-agent")
3464	delete(header, "content-type")
3465	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3466	if !reflect.DeepEqual(header, expectedHeader) {
3467		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3468	}
3469	if err := stream.CloseSend(); err != nil {
3470		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3471	}
3472}
3473
3474// TestMalformedHTTP2Metedata verfies the returned error when the client
3475// sends an illegal metadata.
3476func TestMalformedHTTP2Metadata(t *testing.T) {
3477	defer leakcheck.Check(t)
3478	for _, e := range listTestEnv() {
3479		if e.name == "handler-tls" {
3480			// Failed with "server stops accepting new RPCs".
3481			// Server stops accepting new RPCs when the client sends an illegal http2 header.
3482			continue
3483		}
3484		testMalformedHTTP2Metadata(t, e)
3485	}
3486}
3487
3488func testMalformedHTTP2Metadata(t *testing.T, e env) {
3489	te := newTest(t, e)
3490	te.startServer(&testServer{security: e.security})
3491	defer te.tearDown()
3492	tc := testpb.NewTestServiceClient(te.clientConn())
3493
3494	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 2718)
3495	if err != nil {
3496		t.Fatal(err)
3497	}
3498
3499	req := &testpb.SimpleRequest{
3500		ResponseType: testpb.PayloadType_COMPRESSABLE,
3501		ResponseSize: 314,
3502		Payload:      payload,
3503	}
3504	ctx := metadata.NewOutgoingContext(context.Background(), malformedHTTP2Metadata)
3505	if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Internal {
3506		t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal)
3507	}
3508}
3509
3510func TestTransparentRetry(t *testing.T) {
3511	defer leakcheck.Check(t)
3512	for _, e := range listTestEnv() {
3513		if e.name == "handler-tls" {
3514			// Fails with RST_STREAM / FLOW_CONTROL_ERROR
3515			continue
3516		}
3517		testTransparentRetry(t, e)
3518	}
3519}
3520
3521// This test makes sure RPCs are retried times when they receive a RST_STREAM
3522// with the REFUSED_STREAM error code, which the InTapHandle provokes.
3523func testTransparentRetry(t *testing.T, e env) {
3524	te := newTest(t, e)
3525	attempts := 0
3526	successAttempt := 2
3527	te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) {
3528		attempts++
3529		if attempts < successAttempt {
3530			return nil, errors.New("not now")
3531		}
3532		return ctx, nil
3533	}
3534	te.startServer(&testServer{security: e.security})
3535	defer te.tearDown()
3536
3537	cc := te.clientConn()
3538	tsc := testpb.NewTestServiceClient(cc)
3539	testCases := []struct {
3540		successAttempt int
3541		failFast       bool
3542		errCode        codes.Code
3543	}{{
3544		successAttempt: 1,
3545	}, {
3546		successAttempt: 2,
3547	}, {
3548		successAttempt: 3,
3549		errCode:        codes.Unavailable,
3550	}, {
3551		successAttempt: 1,
3552		failFast:       true,
3553	}, {
3554		successAttempt: 2,
3555		failFast:       true,
3556		errCode:        codes.Unavailable, // We won't retry on fail fast.
3557	}}
3558	for _, tc := range testCases {
3559		attempts = 0
3560		successAttempt = tc.successAttempt
3561
3562		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
3563		_, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(tc.failFast))
3564		cancel()
3565		if status.Code(err) != tc.errCode {
3566			t.Errorf("%+v: tsc.EmptyCall(_, _) = _, %v, want _, Code=%v", tc, err, tc.errCode)
3567		}
3568	}
3569}
3570
3571func TestCancel(t *testing.T) {
3572	defer leakcheck.Check(t)
3573	for _, e := range listTestEnv() {
3574		testCancel(t, e)
3575	}
3576}
3577
3578func testCancel(t *testing.T, e env) {
3579	te := newTest(t, e)
3580	te.declareLogNoise("grpc: the client connection is closing; please retry")
3581	te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second})
3582	defer te.tearDown()
3583
3584	cc := te.clientConn()
3585	tc := testpb.NewTestServiceClient(cc)
3586
3587	const argSize = 2718
3588	const respSize = 314
3589
3590	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3591	if err != nil {
3592		t.Fatal(err)
3593	}
3594
3595	req := &testpb.SimpleRequest{
3596		ResponseType: testpb.PayloadType_COMPRESSABLE,
3597		ResponseSize: respSize,
3598		Payload:      payload,
3599	}
3600	ctx, cancel := context.WithCancel(context.Background())
3601	time.AfterFunc(1*time.Millisecond, cancel)
3602	if r, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Canceled {
3603		t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled)
3604	}
3605	awaitNewConnLogOutput()
3606}
3607
3608func TestCancelNoIO(t *testing.T) {
3609	defer leakcheck.Check(t)
3610	for _, e := range listTestEnv() {
3611		testCancelNoIO(t, e)
3612	}
3613}
3614
3615func testCancelNoIO(t *testing.T, e env) {
3616	te := newTest(t, e)
3617	te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken")
3618	te.maxStream = 1 // Only allows 1 live stream per server transport.
3619	te.startServer(&testServer{security: e.security})
3620	defer te.tearDown()
3621
3622	cc := te.clientConn()
3623	tc := testpb.NewTestServiceClient(cc)
3624
3625	// Start one blocked RPC for which we'll never send streaming
3626	// input. This will consume the 1 maximum concurrent streams,
3627	// causing future RPCs to hang.
3628	ctx, cancelFirst := context.WithCancel(context.Background())
3629	_, err := tc.StreamingInputCall(ctx)
3630	if err != nil {
3631		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3632	}
3633
3634	// Loop until the ClientConn receives the initial settings
3635	// frame from the server, notifying it about the maximum
3636	// concurrent streams. We know when it's received it because
3637	// an RPC will fail with codes.DeadlineExceeded instead of
3638	// succeeding.
3639	// TODO(bradfitz): add internal test hook for this (Issue 534)
3640	for {
3641		ctx, cancelSecond := context.WithTimeout(context.Background(), 50*time.Millisecond)
3642		_, err := tc.StreamingInputCall(ctx)
3643		cancelSecond()
3644		if err == nil {
3645			continue
3646		}
3647		if status.Code(err) == codes.DeadlineExceeded {
3648			break
3649		}
3650		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
3651	}
3652	// If there are any RPCs in flight before the client receives
3653	// the max streams setting, let them be expired.
3654	// TODO(bradfitz): add internal test hook for this (Issue 534)
3655	time.Sleep(50 * time.Millisecond)
3656
3657	go func() {
3658		time.Sleep(50 * time.Millisecond)
3659		cancelFirst()
3660	}()
3661
3662	// This should be blocked until the 1st is canceled, then succeed.
3663	ctx, cancelThird := context.WithTimeout(context.Background(), 500*time.Millisecond)
3664	if _, err := tc.StreamingInputCall(ctx); err != nil {
3665		t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3666	}
3667	cancelThird()
3668}
3669
3670// The following tests the gRPC streaming RPC implementations.
3671// TODO(zhaoq): Have better coverage on error cases.
3672var (
3673	reqSizes  = []int{27182, 8, 1828, 45904}
3674	respSizes = []int{31415, 9, 2653, 58979}
3675)
3676
3677func TestNoService(t *testing.T) {
3678	defer leakcheck.Check(t)
3679	for _, e := range listTestEnv() {
3680		testNoService(t, e)
3681	}
3682}
3683
3684func testNoService(t *testing.T, e env) {
3685	te := newTest(t, e)
3686	te.startServer(nil)
3687	defer te.tearDown()
3688
3689	cc := te.clientConn()
3690	tc := testpb.NewTestServiceClient(cc)
3691
3692	stream, err := tc.FullDuplexCall(te.ctx, grpc.FailFast(false))
3693	if err != nil {
3694		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3695	}
3696	if _, err := stream.Recv(); status.Code(err) != codes.Unimplemented {
3697		t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented)
3698	}
3699}
3700
3701func TestPingPong(t *testing.T) {
3702	defer leakcheck.Check(t)
3703	for _, e := range listTestEnv() {
3704		testPingPong(t, e)
3705	}
3706}
3707
3708func testPingPong(t *testing.T, e env) {
3709	te := newTest(t, e)
3710	te.startServer(&testServer{security: e.security})
3711	defer te.tearDown()
3712	tc := testpb.NewTestServiceClient(te.clientConn())
3713
3714	stream, err := tc.FullDuplexCall(te.ctx)
3715	if err != nil {
3716		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3717	}
3718	var index int
3719	for index < len(reqSizes) {
3720		respParam := []*testpb.ResponseParameters{
3721			{
3722				Size: int32(respSizes[index]),
3723			},
3724		}
3725
3726		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
3727		if err != nil {
3728			t.Fatal(err)
3729		}
3730
3731		req := &testpb.StreamingOutputCallRequest{
3732			ResponseType:       testpb.PayloadType_COMPRESSABLE,
3733			ResponseParameters: respParam,
3734			Payload:            payload,
3735		}
3736		if err := stream.Send(req); err != nil {
3737			t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3738		}
3739		reply, err := stream.Recv()
3740		if err != nil {
3741			t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
3742		}
3743		pt := reply.GetPayload().GetType()
3744		if pt != testpb.PayloadType_COMPRESSABLE {
3745			t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
3746		}
3747		size := len(reply.GetPayload().GetBody())
3748		if size != int(respSizes[index]) {
3749			t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
3750		}
3751		index++
3752	}
3753	if err := stream.CloseSend(); err != nil {
3754		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3755	}
3756	if _, err := stream.Recv(); err != io.EOF {
3757		t.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
3758	}
3759}
3760
3761func TestMetadataStreamingRPC(t *testing.T) {
3762	defer leakcheck.Check(t)
3763	for _, e := range listTestEnv() {
3764		testMetadataStreamingRPC(t, e)
3765	}
3766}
3767
3768func testMetadataStreamingRPC(t *testing.T, e env) {
3769	te := newTest(t, e)
3770	te.startServer(&testServer{security: e.security})
3771	defer te.tearDown()
3772	tc := testpb.NewTestServiceClient(te.clientConn())
3773
3774	ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
3775	stream, err := tc.FullDuplexCall(ctx)
3776	if err != nil {
3777		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3778	}
3779	go func() {
3780		headerMD, err := stream.Header()
3781		if e.security == "tls" {
3782			delete(headerMD, "transport_security_type")
3783		}
3784		delete(headerMD, "trailer") // ignore if present
3785		delete(headerMD, "user-agent")
3786		delete(headerMD, "content-type")
3787		if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
3788			t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
3789		}
3790		// test the cached value.
3791		headerMD, err = stream.Header()
3792		delete(headerMD, "trailer") // ignore if present
3793		delete(headerMD, "user-agent")
3794		delete(headerMD, "content-type")
3795		if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
3796			t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
3797		}
3798		err = func() error {
3799			for index := 0; index < len(reqSizes); index++ {
3800				respParam := []*testpb.ResponseParameters{
3801					{
3802						Size: int32(respSizes[index]),
3803					},
3804				}
3805
3806				payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
3807				if err != nil {
3808					return err
3809				}
3810
3811				req := &testpb.StreamingOutputCallRequest{
3812					ResponseType:       testpb.PayloadType_COMPRESSABLE,
3813					ResponseParameters: respParam,
3814					Payload:            payload,
3815				}
3816				if err := stream.Send(req); err != nil {
3817					return fmt.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3818				}
3819			}
3820			return nil
3821		}()
3822		// Tell the server we're done sending args.
3823		stream.CloseSend()
3824		if err != nil {
3825			t.Error(err)
3826		}
3827	}()
3828	for {
3829		if _, err := stream.Recv(); err != nil {
3830			break
3831		}
3832	}
3833	trailerMD := stream.Trailer()
3834	if !reflect.DeepEqual(testTrailerMetadata, trailerMD) {
3835		t.Fatalf("%v.Trailer() = %v, want %v", stream, trailerMD, testTrailerMetadata)
3836	}
3837}
3838
3839func TestServerStreaming(t *testing.T) {
3840	defer leakcheck.Check(t)
3841	for _, e := range listTestEnv() {
3842		testServerStreaming(t, e)
3843	}
3844}
3845
3846func testServerStreaming(t *testing.T, e env) {
3847	te := newTest(t, e)
3848	te.startServer(&testServer{security: e.security})
3849	defer te.tearDown()
3850	tc := testpb.NewTestServiceClient(te.clientConn())
3851
3852	respParam := make([]*testpb.ResponseParameters, len(respSizes))
3853	for i, s := range respSizes {
3854		respParam[i] = &testpb.ResponseParameters{
3855			Size: int32(s),
3856		}
3857	}
3858	req := &testpb.StreamingOutputCallRequest{
3859		ResponseType:       testpb.PayloadType_COMPRESSABLE,
3860		ResponseParameters: respParam,
3861	}
3862	stream, err := tc.StreamingOutputCall(context.Background(), req)
3863	if err != nil {
3864		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3865	}
3866	var rpcStatus error
3867	var respCnt int
3868	var index int
3869	for {
3870		reply, err := stream.Recv()
3871		if err != nil {
3872			rpcStatus = err
3873			break
3874		}
3875		pt := reply.GetPayload().GetType()
3876		if pt != testpb.PayloadType_COMPRESSABLE {
3877			t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
3878		}
3879		size := len(reply.GetPayload().GetBody())
3880		if size != int(respSizes[index]) {
3881			t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
3882		}
3883		index++
3884		respCnt++
3885	}
3886	if rpcStatus != io.EOF {
3887		t.Fatalf("Failed to finish the server streaming rpc: %v, want <EOF>", rpcStatus)
3888	}
3889	if respCnt != len(respSizes) {
3890		t.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
3891	}
3892}
3893
3894func TestFailedServerStreaming(t *testing.T) {
3895	defer leakcheck.Check(t)
3896	for _, e := range listTestEnv() {
3897		testFailedServerStreaming(t, e)
3898	}
3899}
3900
3901func testFailedServerStreaming(t *testing.T, e env) {
3902	te := newTest(t, e)
3903	te.userAgent = failAppUA
3904	te.startServer(&testServer{security: e.security})
3905	defer te.tearDown()
3906	tc := testpb.NewTestServiceClient(te.clientConn())
3907
3908	respParam := make([]*testpb.ResponseParameters, len(respSizes))
3909	for i, s := range respSizes {
3910		respParam[i] = &testpb.ResponseParameters{
3911			Size: int32(s),
3912		}
3913	}
3914	req := &testpb.StreamingOutputCallRequest{
3915		ResponseType:       testpb.PayloadType_COMPRESSABLE,
3916		ResponseParameters: respParam,
3917	}
3918	ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
3919	stream, err := tc.StreamingOutputCall(ctx, req)
3920	if err != nil {
3921		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3922	}
3923	wantErr := status.Error(codes.DataLoss, "error for testing: "+failAppUA)
3924	if _, err := stream.Recv(); !reflect.DeepEqual(err, wantErr) {
3925		t.Fatalf("%v.Recv() = _, %v, want _, %v", stream, err, wantErr)
3926	}
3927}
3928
3929// concurrentSendServer is a TestServiceServer whose
3930// StreamingOutputCall makes ten serial Send calls, sending payloads
3931// "0".."9", inclusive.  TestServerStreamingConcurrent verifies they
3932// were received in the correct order, and that there were no races.
3933//
3934// All other TestServiceServer methods crash if called.
3935type concurrentSendServer struct {
3936	testpb.TestServiceServer
3937}
3938
3939func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
3940	for i := 0; i < 10; i++ {
3941		stream.Send(&testpb.StreamingOutputCallResponse{
3942			Payload: &testpb.Payload{
3943				Body: []byte{'0' + uint8(i)},
3944			},
3945		})
3946	}
3947	return nil
3948}
3949
3950// Tests doing a bunch of concurrent streaming output calls.
3951func TestServerStreamingConcurrent(t *testing.T) {
3952	defer leakcheck.Check(t)
3953	for _, e := range listTestEnv() {
3954		testServerStreamingConcurrent(t, e)
3955	}
3956}
3957
3958func testServerStreamingConcurrent(t *testing.T, e env) {
3959	te := newTest(t, e)
3960	te.startServer(concurrentSendServer{})
3961	defer te.tearDown()
3962
3963	cc := te.clientConn()
3964	tc := testpb.NewTestServiceClient(cc)
3965
3966	doStreamingCall := func() {
3967		req := &testpb.StreamingOutputCallRequest{}
3968		stream, err := tc.StreamingOutputCall(context.Background(), req)
3969		if err != nil {
3970			t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3971			return
3972		}
3973		var ngot int
3974		var buf bytes.Buffer
3975		for {
3976			reply, err := stream.Recv()
3977			if err == io.EOF {
3978				break
3979			}
3980			if err != nil {
3981				t.Fatal(err)
3982			}
3983			ngot++
3984			if buf.Len() > 0 {
3985				buf.WriteByte(',')
3986			}
3987			buf.Write(reply.GetPayload().GetBody())
3988		}
3989		if want := 10; ngot != want {
3990			t.Errorf("Got %d replies, want %d", ngot, want)
3991		}
3992		if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want {
3993			t.Errorf("Got replies %q; want %q", got, want)
3994		}
3995	}
3996
3997	var wg sync.WaitGroup
3998	for i := 0; i < 20; i++ {
3999		wg.Add(1)
4000		go func() {
4001			defer wg.Done()
4002			doStreamingCall()
4003		}()
4004	}
4005	wg.Wait()
4006
4007}
4008
4009func generatePayloadSizes() [][]int {
4010	reqSizes := [][]int{
4011		{27182, 8, 1828, 45904},
4012	}
4013
4014	num8KPayloads := 1024
4015	eightKPayloads := []int{}
4016	for i := 0; i < num8KPayloads; i++ {
4017		eightKPayloads = append(eightKPayloads, (1 << 13))
4018	}
4019	reqSizes = append(reqSizes, eightKPayloads)
4020
4021	num2MPayloads := 8
4022	twoMPayloads := []int{}
4023	for i := 0; i < num2MPayloads; i++ {
4024		twoMPayloads = append(twoMPayloads, (1 << 21))
4025	}
4026	reqSizes = append(reqSizes, twoMPayloads)
4027
4028	return reqSizes
4029}
4030
4031func TestClientStreaming(t *testing.T) {
4032	defer leakcheck.Check(t)
4033	for _, s := range generatePayloadSizes() {
4034		for _, e := range listTestEnv() {
4035			testClientStreaming(t, e, s)
4036		}
4037	}
4038}
4039
4040func testClientStreaming(t *testing.T, e env, sizes []int) {
4041	te := newTest(t, e)
4042	te.startServer(&testServer{security: e.security})
4043	defer te.tearDown()
4044	tc := testpb.NewTestServiceClient(te.clientConn())
4045
4046	ctx, cancel := context.WithTimeout(te.ctx, time.Second*30)
4047	defer cancel()
4048	stream, err := tc.StreamingInputCall(ctx)
4049	if err != nil {
4050		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
4051	}
4052
4053	var sum int
4054	for _, s := range sizes {
4055		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s))
4056		if err != nil {
4057			t.Fatal(err)
4058		}
4059
4060		req := &testpb.StreamingInputCallRequest{
4061			Payload: payload,
4062		}
4063		if err := stream.Send(req); err != nil {
4064			t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
4065		}
4066		sum += s
4067	}
4068	reply, err := stream.CloseAndRecv()
4069	if err != nil {
4070		t.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
4071	}
4072	if reply.GetAggregatedPayloadSize() != int32(sum) {
4073		t.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
4074	}
4075}
4076
4077func TestClientStreamingError(t *testing.T) {
4078	defer leakcheck.Check(t)
4079	for _, e := range listTestEnv() {
4080		if e.name == "handler-tls" {
4081			continue
4082		}
4083		testClientStreamingError(t, e)
4084	}
4085}
4086
4087func testClientStreamingError(t *testing.T, e env) {
4088	te := newTest(t, e)
4089	te.startServer(&testServer{security: e.security, earlyFail: true})
4090	defer te.tearDown()
4091	tc := testpb.NewTestServiceClient(te.clientConn())
4092
4093	stream, err := tc.StreamingInputCall(te.ctx)
4094	if err != nil {
4095		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
4096	}
4097	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1)
4098	if err != nil {
4099		t.Fatal(err)
4100	}
4101
4102	req := &testpb.StreamingInputCallRequest{
4103		Payload: payload,
4104	}
4105	// The 1st request should go through.
4106	if err := stream.Send(req); err != nil {
4107		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
4108	}
4109	for {
4110		if err := stream.Send(req); err != io.EOF {
4111			continue
4112		}
4113		if _, err := stream.CloseAndRecv(); status.Code(err) != codes.NotFound {
4114			t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound)
4115		}
4116		break
4117	}
4118}
4119
4120func TestExceedMaxStreamsLimit(t *testing.T) {
4121	defer leakcheck.Check(t)
4122	for _, e := range listTestEnv() {
4123		testExceedMaxStreamsLimit(t, e)
4124	}
4125}
4126
4127func testExceedMaxStreamsLimit(t *testing.T, e env) {
4128	te := newTest(t, e)
4129	te.declareLogNoise(
4130		"http2Client.notifyError got notified that the client transport was broken",
4131		"Conn.resetTransport failed to create client transport",
4132		"grpc: the connection is closing",
4133	)
4134	te.maxStream = 1 // Only allows 1 live stream per server transport.
4135	te.startServer(&testServer{security: e.security})
4136	defer te.tearDown()
4137
4138	cc := te.clientConn()
4139	tc := testpb.NewTestServiceClient(cc)
4140
4141	_, err := tc.StreamingInputCall(te.ctx)
4142	if err != nil {
4143		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
4144	}
4145	// Loop until receiving the new max stream setting from the server.
4146	for {
4147		ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
4148		defer cancel()
4149		_, err := tc.StreamingInputCall(ctx)
4150		if err == nil {
4151			time.Sleep(50 * time.Millisecond)
4152			continue
4153		}
4154		if status.Code(err) == codes.DeadlineExceeded {
4155			break
4156		}
4157		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
4158	}
4159}
4160
4161func TestStreamsQuotaRecovery(t *testing.T) {
4162	defer leakcheck.Check(t)
4163	for _, e := range listTestEnv() {
4164		testStreamsQuotaRecovery(t, e)
4165	}
4166}
4167
4168func testStreamsQuotaRecovery(t *testing.T, e env) {
4169	te := newTest(t, e)
4170	te.declareLogNoise(
4171		"http2Client.notifyError got notified that the client transport was broken",
4172		"Conn.resetTransport failed to create client transport",
4173		"grpc: the connection is closing",
4174	)
4175	te.maxStream = 1 // Allows 1 live stream.
4176	te.startServer(&testServer{security: e.security})
4177	defer te.tearDown()
4178
4179	cc := te.clientConn()
4180	tc := testpb.NewTestServiceClient(cc)
4181	ctx, cancel := context.WithCancel(context.Background())
4182	defer cancel()
4183	if _, err := tc.StreamingInputCall(ctx); err != nil {
4184		t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, <nil>", err)
4185	}
4186	// Loop until the new max stream setting is effective.
4187	for {
4188		ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
4189		_, err := tc.StreamingInputCall(ctx)
4190		cancel()
4191		if err == nil {
4192			time.Sleep(5 * time.Millisecond)
4193			continue
4194		}
4195		if status.Code(err) == codes.DeadlineExceeded {
4196			break
4197		}
4198		t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %s", err, codes.DeadlineExceeded)
4199	}
4200
4201	var wg sync.WaitGroup
4202	for i := 0; i < 10; i++ {
4203		wg.Add(1)
4204		go func() {
4205			defer wg.Done()
4206			payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314)
4207			if err != nil {
4208				t.Error(err)
4209				return
4210			}
4211			req := &testpb.SimpleRequest{
4212				ResponseType: testpb.PayloadType_COMPRESSABLE,
4213				ResponseSize: 1592,
4214				Payload:      payload,
4215			}
4216			// No rpc should go through due to the max streams limit.
4217			ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
4218			defer cancel()
4219			if _, err := tc.UnaryCall(ctx, req, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
4220				t.Errorf("tc.UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
4221			}
4222		}()
4223	}
4224	wg.Wait()
4225
4226	cancel()
4227	// A new stream should be allowed after canceling the first one.
4228	ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
4229	defer cancel()
4230	if _, err := tc.StreamingInputCall(ctx); err != nil {
4231		t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %v", err, nil)
4232	}
4233}
4234
4235func TestCompressServerHasNoSupport(t *testing.T) {
4236	defer leakcheck.Check(t)
4237	for _, e := range listTestEnv() {
4238		testCompressServerHasNoSupport(t, e)
4239	}
4240}
4241
4242func testCompressServerHasNoSupport(t *testing.T, e env) {
4243	te := newTest(t, e)
4244	te.serverCompression = false
4245	te.clientCompression = false
4246	te.clientNopCompression = true
4247	te.startServer(&testServer{security: e.security})
4248	defer te.tearDown()
4249	tc := testpb.NewTestServiceClient(te.clientConn())
4250
4251	const argSize = 271828
4252	const respSize = 314159
4253	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
4254	if err != nil {
4255		t.Fatal(err)
4256	}
4257	req := &testpb.SimpleRequest{
4258		ResponseType: testpb.PayloadType_COMPRESSABLE,
4259		ResponseSize: respSize,
4260		Payload:      payload,
4261	}
4262	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.Unimplemented {
4263		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %s", err, codes.Unimplemented)
4264	}
4265	// Streaming RPC
4266	stream, err := tc.FullDuplexCall(context.Background())
4267	if err != nil {
4268		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4269	}
4270	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Unimplemented {
4271		t.Fatalf("%v.Recv() = %v, want error code %s", stream, err, codes.Unimplemented)
4272	}
4273}
4274
4275func TestCompressOK(t *testing.T) {
4276	defer leakcheck.Check(t)
4277	for _, e := range listTestEnv() {
4278		testCompressOK(t, e)
4279	}
4280}
4281
4282func testCompressOK(t *testing.T, e env) {
4283	te := newTest(t, e)
4284	te.serverCompression = true
4285	te.clientCompression = true
4286	te.startServer(&testServer{security: e.security})
4287	defer te.tearDown()
4288	tc := testpb.NewTestServiceClient(te.clientConn())
4289
4290	// Unary call
4291	const argSize = 271828
4292	const respSize = 314159
4293	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
4294	if err != nil {
4295		t.Fatal(err)
4296	}
4297	req := &testpb.SimpleRequest{
4298		ResponseType: testpb.PayloadType_COMPRESSABLE,
4299		ResponseSize: respSize,
4300		Payload:      payload,
4301	}
4302	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something"))
4303	if _, err := tc.UnaryCall(ctx, req); err != nil {
4304		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
4305	}
4306	// Streaming RPC
4307	ctx, cancel := context.WithCancel(context.Background())
4308	defer cancel()
4309	stream, err := tc.FullDuplexCall(ctx)
4310	if err != nil {
4311		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4312	}
4313	respParam := []*testpb.ResponseParameters{
4314		{
4315			Size: 31415,
4316		},
4317	}
4318	payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
4319	if err != nil {
4320		t.Fatal(err)
4321	}
4322	sreq := &testpb.StreamingOutputCallRequest{
4323		ResponseType:       testpb.PayloadType_COMPRESSABLE,
4324		ResponseParameters: respParam,
4325		Payload:            payload,
4326	}
4327	if err := stream.Send(sreq); err != nil {
4328		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
4329	}
4330	stream.CloseSend()
4331	if _, err := stream.Recv(); err != nil {
4332		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
4333	}
4334	if _, err := stream.Recv(); err != io.EOF {
4335		t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err)
4336	}
4337}
4338
4339func TestIdentityEncoding(t *testing.T) {
4340	defer leakcheck.Check(t)
4341	for _, e := range listTestEnv() {
4342		testIdentityEncoding(t, e)
4343	}
4344}
4345
4346func testIdentityEncoding(t *testing.T, e env) {
4347	te := newTest(t, e)
4348	te.startServer(&testServer{security: e.security})
4349	defer te.tearDown()
4350	tc := testpb.NewTestServiceClient(te.clientConn())
4351
4352	// Unary call
4353	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 5)
4354	if err != nil {
4355		t.Fatal(err)
4356	}
4357	req := &testpb.SimpleRequest{
4358		ResponseType: testpb.PayloadType_COMPRESSABLE,
4359		ResponseSize: 10,
4360		Payload:      payload,
4361	}
4362	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something"))
4363	if _, err := tc.UnaryCall(ctx, req); err != nil {
4364		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
4365	}
4366	// Streaming RPC
4367	ctx, cancel := context.WithCancel(context.Background())
4368	defer cancel()
4369	stream, err := tc.FullDuplexCall(ctx, grpc.UseCompressor("identity"))
4370	if err != nil {
4371		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4372	}
4373	payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
4374	if err != nil {
4375		t.Fatal(err)
4376	}
4377	sreq := &testpb.StreamingOutputCallRequest{
4378		ResponseType:       testpb.PayloadType_COMPRESSABLE,
4379		ResponseParameters: []*testpb.ResponseParameters{{Size: 10}},
4380		Payload:            payload,
4381	}
4382	if err := stream.Send(sreq); err != nil {
4383		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
4384	}
4385	stream.CloseSend()
4386	if _, err := stream.Recv(); err != nil {
4387		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
4388	}
4389	if _, err := stream.Recv(); err != io.EOF {
4390		t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err)
4391	}
4392}
4393
4394func TestUnaryClientInterceptor(t *testing.T) {
4395	defer leakcheck.Check(t)
4396	for _, e := range listTestEnv() {
4397		testUnaryClientInterceptor(t, e)
4398	}
4399}
4400
4401func failOkayRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
4402	err := invoker(ctx, method, req, reply, cc, opts...)
4403	if err == nil {
4404		return status.Error(codes.NotFound, "")
4405	}
4406	return err
4407}
4408
4409func testUnaryClientInterceptor(t *testing.T, e env) {
4410	te := newTest(t, e)
4411	te.userAgent = testAppUA
4412	te.unaryClientInt = failOkayRPC
4413	te.startServer(&testServer{security: e.security})
4414	defer te.tearDown()
4415
4416	tc := testpb.NewTestServiceClient(te.clientConn())
4417	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.NotFound {
4418		t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.NotFound)
4419	}
4420}
4421
4422func TestStreamClientInterceptor(t *testing.T) {
4423	defer leakcheck.Check(t)
4424	for _, e := range listTestEnv() {
4425		testStreamClientInterceptor(t, e)
4426	}
4427}
4428
4429func failOkayStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
4430	s, err := streamer(ctx, desc, cc, method, opts...)
4431	if err == nil {
4432		return nil, status.Error(codes.NotFound, "")
4433	}
4434	return s, nil
4435}
4436
4437func testStreamClientInterceptor(t *testing.T, e env) {
4438	te := newTest(t, e)
4439	te.streamClientInt = failOkayStream
4440	te.startServer(&testServer{security: e.security})
4441	defer te.tearDown()
4442
4443	tc := testpb.NewTestServiceClient(te.clientConn())
4444	respParam := []*testpb.ResponseParameters{
4445		{
4446			Size: int32(1),
4447		},
4448	}
4449	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
4450	if err != nil {
4451		t.Fatal(err)
4452	}
4453	req := &testpb.StreamingOutputCallRequest{
4454		ResponseType:       testpb.PayloadType_COMPRESSABLE,
4455		ResponseParameters: respParam,
4456		Payload:            payload,
4457	}
4458	if _, err := tc.StreamingOutputCall(context.Background(), req); status.Code(err) != codes.NotFound {
4459		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, error code %s", tc, err, codes.NotFound)
4460	}
4461}
4462
4463func TestUnaryServerInterceptor(t *testing.T) {
4464	defer leakcheck.Check(t)
4465	for _, e := range listTestEnv() {
4466		testUnaryServerInterceptor(t, e)
4467	}
4468}
4469
4470func errInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
4471	return nil, status.Error(codes.PermissionDenied, "")
4472}
4473
4474func testUnaryServerInterceptor(t *testing.T, e env) {
4475	te := newTest(t, e)
4476	te.unaryServerInt = errInjector
4477	te.startServer(&testServer{security: e.security})
4478	defer te.tearDown()
4479
4480	tc := testpb.NewTestServiceClient(te.clientConn())
4481	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.PermissionDenied {
4482		t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
4483	}
4484}
4485
4486func TestStreamServerInterceptor(t *testing.T) {
4487	defer leakcheck.Check(t)
4488	for _, e := range listTestEnv() {
4489		// TODO(bradfitz): Temporarily skip this env due to #619.
4490		if e.name == "handler-tls" {
4491			continue
4492		}
4493		testStreamServerInterceptor(t, e)
4494	}
4495}
4496
4497func fullDuplexOnly(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
4498	if info.FullMethod == "/grpc.testing.TestService/FullDuplexCall" {
4499		return handler(srv, ss)
4500	}
4501	// Reject the other methods.
4502	return status.Error(codes.PermissionDenied, "")
4503}
4504
4505func testStreamServerInterceptor(t *testing.T, e env) {
4506	te := newTest(t, e)
4507	te.streamServerInt = fullDuplexOnly
4508	te.startServer(&testServer{security: e.security})
4509	defer te.tearDown()
4510
4511	tc := testpb.NewTestServiceClient(te.clientConn())
4512	respParam := []*testpb.ResponseParameters{
4513		{
4514			Size: int32(1),
4515		},
4516	}
4517	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
4518	if err != nil {
4519		t.Fatal(err)
4520	}
4521	req := &testpb.StreamingOutputCallRequest{
4522		ResponseType:       testpb.PayloadType_COMPRESSABLE,
4523		ResponseParameters: respParam,
4524		Payload:            payload,
4525	}
4526	s1, err := tc.StreamingOutputCall(context.Background(), req)
4527	if err != nil {
4528		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, <nil>", tc, err)
4529	}
4530	if _, err := s1.Recv(); status.Code(err) != codes.PermissionDenied {
4531		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
4532	}
4533	s2, err := tc.FullDuplexCall(context.Background())
4534	if err != nil {
4535		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4536	}
4537	if err := s2.Send(req); err != nil {
4538		t.Fatalf("%v.Send(_) = %v, want <nil>", s2, err)
4539	}
4540	if _, err := s2.Recv(); err != nil {
4541		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", s2, err)
4542	}
4543}
4544
4545// funcServer implements methods of TestServiceServer using funcs,
4546// similar to an http.HandlerFunc.
4547// Any unimplemented method will crash. Tests implement the method(s)
4548// they need.
4549type funcServer struct {
4550	testpb.TestServiceServer
4551	unaryCall          func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
4552	streamingInputCall func(stream testpb.TestService_StreamingInputCallServer) error
4553	fullDuplexCall     func(stream testpb.TestService_FullDuplexCallServer) error
4554}
4555
4556func (s *funcServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4557	return s.unaryCall(ctx, in)
4558}
4559
4560func (s *funcServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
4561	return s.streamingInputCall(stream)
4562}
4563
4564func (s *funcServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
4565	return s.fullDuplexCall(stream)
4566}
4567
4568func TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) {
4569	defer leakcheck.Check(t)
4570	for _, e := range listTestEnv() {
4571		testClientRequestBodyErrorUnexpectedEOF(t, e)
4572	}
4573}
4574
4575func testClientRequestBodyErrorUnexpectedEOF(t *testing.T, e env) {
4576	te := newTest(t, e)
4577	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4578		errUnexpectedCall := errors.New("unexpected call func server method")
4579		t.Error(errUnexpectedCall)
4580		return nil, errUnexpectedCall
4581	}}
4582	te.startServer(ts)
4583	defer te.tearDown()
4584	te.withServerTester(func(st *serverTester) {
4585		st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4586		// Say we have 5 bytes coming, but set END_STREAM flag:
4587		st.writeData(1, true, []byte{0, 0, 0, 0, 5})
4588		st.wantAnyFrame() // wait for server to crash (it used to crash)
4589	})
4590}
4591
4592func TestClientRequestBodyErrorCloseAfterLength(t *testing.T) {
4593	defer leakcheck.Check(t)
4594	for _, e := range listTestEnv() {
4595		testClientRequestBodyErrorCloseAfterLength(t, e)
4596	}
4597}
4598
4599func testClientRequestBodyErrorCloseAfterLength(t *testing.T, e env) {
4600	te := newTest(t, e)
4601	te.declareLogNoise("Server.processUnaryRPC failed to write status")
4602	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4603		errUnexpectedCall := errors.New("unexpected call func server method")
4604		t.Error(errUnexpectedCall)
4605		return nil, errUnexpectedCall
4606	}}
4607	te.startServer(ts)
4608	defer te.tearDown()
4609	te.withServerTester(func(st *serverTester) {
4610		st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4611		// say we're sending 5 bytes, but then close the connection instead.
4612		st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4613		st.cc.Close()
4614	})
4615}
4616
4617func TestClientRequestBodyErrorCancel(t *testing.T) {
4618	defer leakcheck.Check(t)
4619	for _, e := range listTestEnv() {
4620		testClientRequestBodyErrorCancel(t, e)
4621	}
4622}
4623
4624func testClientRequestBodyErrorCancel(t *testing.T, e env) {
4625	te := newTest(t, e)
4626	gotCall := make(chan bool, 1)
4627	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4628		gotCall <- true
4629		return new(testpb.SimpleResponse), nil
4630	}}
4631	te.startServer(ts)
4632	defer te.tearDown()
4633	te.withServerTester(func(st *serverTester) {
4634		st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4635		// Say we have 5 bytes coming, but cancel it instead.
4636		st.writeRSTStream(1, http2.ErrCodeCancel)
4637		st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4638
4639		// Verify we didn't a call yet.
4640		select {
4641		case <-gotCall:
4642			t.Fatal("unexpected call")
4643		default:
4644		}
4645
4646		// And now send an uncanceled (but still invalid), just to get a response.
4647		st.writeHeadersGRPC(3, "/grpc.testing.TestService/UnaryCall")
4648		st.writeData(3, true, []byte{0, 0, 0, 0, 0})
4649		<-gotCall
4650		st.wantAnyFrame()
4651	})
4652}
4653
4654func TestClientRequestBodyErrorCancelStreamingInput(t *testing.T) {
4655	defer leakcheck.Check(t)
4656	for _, e := range listTestEnv() {
4657		testClientRequestBodyErrorCancelStreamingInput(t, e)
4658	}
4659}
4660
4661func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) {
4662	te := newTest(t, e)
4663	recvErr := make(chan error, 1)
4664	ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
4665		_, err := stream.Recv()
4666		recvErr <- err
4667		return nil
4668	}}
4669	te.startServer(ts)
4670	defer te.tearDown()
4671	te.withServerTester(func(st *serverTester) {
4672		st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall")
4673		// Say we have 5 bytes coming, but cancel it instead.
4674		st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4675		st.writeRSTStream(1, http2.ErrCodeCancel)
4676
4677		var got error
4678		select {
4679		case got = <-recvErr:
4680		case <-time.After(3 * time.Second):
4681			t.Fatal("timeout waiting for error")
4682		}
4683		if grpc.Code(got) != codes.Canceled {
4684			t.Errorf("error = %#v; want error code %s", got, codes.Canceled)
4685		}
4686	})
4687}
4688
4689func TestClientResourceExhaustedCancelFullDuplex(t *testing.T) {
4690	defer leakcheck.Check(t)
4691	for _, e := range listTestEnv() {
4692		if e.httpHandler {
4693			// httpHandler write won't be blocked on flow control window.
4694			continue
4695		}
4696		testClientResourceExhaustedCancelFullDuplex(t, e)
4697	}
4698}
4699
4700func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) {
4701	te := newTest(t, e)
4702	recvErr := make(chan error, 1)
4703	ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
4704		defer close(recvErr)
4705		_, err := stream.Recv()
4706		if err != nil {
4707			return status.Errorf(codes.Internal, "stream.Recv() got error: %v, want <nil>", err)
4708		}
4709		// create a payload that's larger than the default flow control window.
4710		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10)
4711		if err != nil {
4712			return err
4713		}
4714		resp := &testpb.StreamingOutputCallResponse{
4715			Payload: payload,
4716		}
4717		ce := make(chan error)
4718		go func() {
4719			var err error
4720			for {
4721				if err = stream.Send(resp); err != nil {
4722					break
4723				}
4724			}
4725			ce <- err
4726		}()
4727		select {
4728		case err = <-ce:
4729		case <-time.After(10 * time.Second):
4730			err = errors.New("10s timeout reached")
4731		}
4732		recvErr <- err
4733		return err
4734	}}
4735	te.startServer(ts)
4736	defer te.tearDown()
4737	// set a low limit on receive message size to error with Resource Exhausted on
4738	// client side when server send a large message.
4739	te.maxClientReceiveMsgSize = newInt(10)
4740	cc := te.clientConn()
4741	tc := testpb.NewTestServiceClient(cc)
4742	stream, err := tc.FullDuplexCall(context.Background())
4743	if err != nil {
4744		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4745	}
4746	req := &testpb.StreamingOutputCallRequest{}
4747	if err := stream.Send(req); err != nil {
4748		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
4749	}
4750	if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted {
4751		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
4752	}
4753	err = <-recvErr
4754	if status.Code(err) != codes.Canceled {
4755		t.Fatalf("server got error %v, want error code: %s", err, codes.Canceled)
4756	}
4757}
4758
4759type clientTimeoutCreds struct {
4760	timeoutReturned bool
4761}
4762
4763func (c *clientTimeoutCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4764	if !c.timeoutReturned {
4765		c.timeoutReturned = true
4766		return nil, nil, context.DeadlineExceeded
4767	}
4768	return rawConn, nil, nil
4769}
4770func (c *clientTimeoutCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4771	return rawConn, nil, nil
4772}
4773func (c *clientTimeoutCreds) Info() credentials.ProtocolInfo {
4774	return credentials.ProtocolInfo{}
4775}
4776func (c *clientTimeoutCreds) Clone() credentials.TransportCredentials {
4777	return nil
4778}
4779func (c *clientTimeoutCreds) OverrideServerName(s string) error {
4780	return nil
4781}
4782
4783func TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) {
4784	te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "clientTimeoutCreds", balancer: "v1"})
4785	te.userAgent = testAppUA
4786	te.startServer(&testServer{security: te.e.security})
4787	defer te.tearDown()
4788
4789	cc := te.clientConn()
4790	tc := testpb.NewTestServiceClient(cc)
4791	// This unary call should succeed, because ClientHandshake will succeed for the second time.
4792	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
4793		te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <nil>", err)
4794	}
4795}
4796
4797type serverDispatchCred struct {
4798	rawConnCh chan net.Conn
4799}
4800
4801func newServerDispatchCred() *serverDispatchCred {
4802	return &serverDispatchCred{
4803		rawConnCh: make(chan net.Conn, 1),
4804	}
4805}
4806func (c *serverDispatchCred) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4807	return rawConn, nil, nil
4808}
4809func (c *serverDispatchCred) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4810	select {
4811	case c.rawConnCh <- rawConn:
4812	default:
4813	}
4814	return nil, nil, credentials.ErrConnDispatched
4815}
4816func (c *serverDispatchCred) Info() credentials.ProtocolInfo {
4817	return credentials.ProtocolInfo{}
4818}
4819func (c *serverDispatchCred) Clone() credentials.TransportCredentials {
4820	return nil
4821}
4822func (c *serverDispatchCred) OverrideServerName(s string) error {
4823	return nil
4824}
4825func (c *serverDispatchCred) getRawConn() net.Conn {
4826	return <-c.rawConnCh
4827}
4828
4829func TestServerCredsDispatch(t *testing.T) {
4830	lis, err := net.Listen("tcp", "localhost:0")
4831	if err != nil {
4832		t.Fatalf("Failed to listen: %v", err)
4833	}
4834	cred := newServerDispatchCred()
4835	s := grpc.NewServer(grpc.Creds(cred))
4836	go s.Serve(lis)
4837	defer s.Stop()
4838
4839	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(cred))
4840	if err != nil {
4841		t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4842	}
4843	defer cc.Close()
4844
4845	rawConn := cred.getRawConn()
4846	// Give grpc a chance to see the error and potentially close the connection.
4847	// And check that connection is not closed after that.
4848	time.Sleep(100 * time.Millisecond)
4849	// Check rawConn is not closed.
4850	if n, err := rawConn.Write([]byte{0}); n <= 0 || err != nil {
4851		t.Errorf("Read() = %v, %v; want n>0, <nil>", n, err)
4852	}
4853}
4854
4855type authorityCheckCreds struct {
4856	got string
4857}
4858
4859func (c *authorityCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4860	return rawConn, nil, nil
4861}
4862func (c *authorityCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4863	c.got = authority
4864	return rawConn, nil, nil
4865}
4866func (c *authorityCheckCreds) Info() credentials.ProtocolInfo {
4867	return credentials.ProtocolInfo{}
4868}
4869func (c *authorityCheckCreds) Clone() credentials.TransportCredentials {
4870	return c
4871}
4872func (c *authorityCheckCreds) OverrideServerName(s string) error {
4873	return nil
4874}
4875
4876// This test makes sure that the authority client handshake gets is the endpoint
4877// in dial target, not the resolved ip address.
4878func TestCredsHandshakeAuthority(t *testing.T) {
4879	const testAuthority = "test.auth.ori.ty"
4880
4881	lis, err := net.Listen("tcp", "localhost:0")
4882	if err != nil {
4883		t.Fatalf("Failed to listen: %v", err)
4884	}
4885	cred := &authorityCheckCreds{}
4886	s := grpc.NewServer()
4887	go s.Serve(lis)
4888	defer s.Stop()
4889
4890	r, rcleanup := manual.GenerateAndRegisterManualResolver()
4891	defer rcleanup()
4892
4893	cc, err := grpc.Dial(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred))
4894	if err != nil {
4895		t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4896	}
4897	defer cc.Close()
4898	r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
4899
4900	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
4901	defer cancel()
4902	for {
4903		s := cc.GetState()
4904		if s == connectivity.Ready {
4905			break
4906		}
4907		if !cc.WaitForStateChange(ctx, s) {
4908			// ctx got timeout or canceled.
4909			t.Fatalf("ClientConn is not ready after 100 ms")
4910		}
4911	}
4912
4913	if cred.got != testAuthority {
4914		t.Fatalf("client creds got authority: %q, want: %q", cred.got, testAuthority)
4915	}
4916}
4917
4918type clientFailCreds struct{}
4919
4920func (c *clientFailCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4921	return rawConn, nil, nil
4922}
4923func (c *clientFailCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4924	return nil, nil, fmt.Errorf("client handshake fails with fatal error")
4925}
4926func (c *clientFailCreds) Info() credentials.ProtocolInfo {
4927	return credentials.ProtocolInfo{}
4928}
4929func (c *clientFailCreds) Clone() credentials.TransportCredentials {
4930	return c
4931}
4932func (c *clientFailCreds) OverrideServerName(s string) error {
4933	return nil
4934}
4935
4936// This test makes sure that failfast RPCs fail if client handshake fails with
4937// fatal errors.
4938func TestFailfastRPCFailOnFatalHandshakeError(t *testing.T) {
4939	lis, err := net.Listen("tcp", "localhost:0")
4940	if err != nil {
4941		t.Fatalf("Failed to listen: %v", err)
4942	}
4943	defer lis.Close()
4944
4945	cc, err := grpc.Dial("passthrough:///"+lis.Addr().String(), grpc.WithTransportCredentials(&clientFailCreds{}))
4946	if err != nil {
4947		t.Fatalf("grpc.Dial(_) = %v", err)
4948	}
4949	defer cc.Close()
4950
4951	tc := testpb.NewTestServiceClient(cc)
4952	// This unary call should fail, but not timeout.
4953	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
4954	defer cancel()
4955	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(true)); status.Code(err) != codes.Unavailable {
4956		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <Unavailable>", err)
4957	}
4958}
4959
4960func TestFlowControlLogicalRace(t *testing.T) {
4961	// Test for a regression of https://github.com/grpc/grpc-go/issues/632,
4962	// and other flow control bugs.
4963
4964	defer leakcheck.Check(t)
4965
4966	const (
4967		itemCount   = 100
4968		itemSize    = 1 << 10
4969		recvCount   = 2
4970		maxFailures = 3
4971
4972		requestTimeout = time.Second * 5
4973	)
4974
4975	requestCount := 10000
4976	if raceMode {
4977		requestCount = 1000
4978	}
4979
4980	lis, err := net.Listen("tcp", "localhost:0")
4981	if err != nil {
4982		t.Fatalf("Failed to listen: %v", err)
4983	}
4984	defer lis.Close()
4985
4986	s := grpc.NewServer()
4987	testpb.RegisterTestServiceServer(s, &flowControlLogicalRaceServer{
4988		itemCount: itemCount,
4989		itemSize:  itemSize,
4990	})
4991	defer s.Stop()
4992
4993	go s.Serve(lis)
4994
4995	ctx := context.Background()
4996
4997	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
4998	if err != nil {
4999		t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
5000	}
5001	defer cc.Close()
5002	cl := testpb.NewTestServiceClient(cc)
5003
5004	failures := 0
5005	for i := 0; i < requestCount; i++ {
5006		ctx, cancel := context.WithTimeout(ctx, requestTimeout)
5007		output, err := cl.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
5008		if err != nil {
5009			t.Fatalf("StreamingOutputCall; err = %q", err)
5010		}
5011
5012		j := 0
5013	loop:
5014		for ; j < recvCount; j++ {
5015			_, err := output.Recv()
5016			if err != nil {
5017				if err == io.EOF {
5018					break loop
5019				}
5020				switch status.Code(err) {
5021				case codes.DeadlineExceeded:
5022					break loop
5023				default:
5024					t.Fatalf("Recv; err = %q", err)
5025				}
5026			}
5027		}
5028		cancel()
5029		<-ctx.Done()
5030
5031		if j < recvCount {
5032			t.Errorf("got %d responses to request %d", j, i)
5033			failures++
5034			if failures >= maxFailures {
5035				// Continue past the first failure to see if the connection is
5036				// entirely broken, or if only a single RPC was affected
5037				break
5038			}
5039		}
5040	}
5041}
5042
5043type flowControlLogicalRaceServer struct {
5044	testpb.TestServiceServer
5045
5046	itemSize  int
5047	itemCount int
5048}
5049
5050func (s *flowControlLogicalRaceServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, srv testpb.TestService_StreamingOutputCallServer) error {
5051	for i := 0; i < s.itemCount; i++ {
5052		err := srv.Send(&testpb.StreamingOutputCallResponse{
5053			Payload: &testpb.Payload{
5054				// Sending a large stream of data which the client reject
5055				// helps to trigger some types of flow control bugs.
5056				//
5057				// Reallocating memory here is inefficient, but the stress it
5058				// puts on the GC leads to more frequent flow control
5059				// failures. The GC likely causes more variety in the
5060				// goroutine scheduling orders.
5061				Body: bytes.Repeat([]byte("a"), s.itemSize),
5062			},
5063		})
5064		if err != nil {
5065			return err
5066		}
5067	}
5068	return nil
5069}
5070
5071type lockingWriter struct {
5072	mu sync.Mutex
5073	w  io.Writer
5074}
5075
5076func (lw *lockingWriter) Write(p []byte) (n int, err error) {
5077	lw.mu.Lock()
5078	defer lw.mu.Unlock()
5079	return lw.w.Write(p)
5080}
5081
5082func (lw *lockingWriter) setWriter(w io.Writer) {
5083	lw.mu.Lock()
5084	defer lw.mu.Unlock()
5085	lw.w = w
5086}
5087
5088var testLogOutput = &lockingWriter{w: os.Stderr}
5089
5090// awaitNewConnLogOutput waits for any of grpc.NewConn's goroutines to
5091// terminate, if they're still running. It spams logs with this
5092// message.  We wait for it so our log filter is still
5093// active. Otherwise the "defer restore()" at the top of various test
5094// functions restores our log filter and then the goroutine spams.
5095func awaitNewConnLogOutput() {
5096	awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry")
5097}
5098
5099func awaitLogOutput(maxWait time.Duration, phrase string) {
5100	pb := []byte(phrase)
5101
5102	timer := time.NewTimer(maxWait)
5103	defer timer.Stop()
5104	wakeup := make(chan bool, 1)
5105	for {
5106		if logOutputHasContents(pb, wakeup) {
5107			return
5108		}
5109		select {
5110		case <-timer.C:
5111			// Too slow. Oh well.
5112			return
5113		case <-wakeup:
5114		}
5115	}
5116}
5117
5118func logOutputHasContents(v []byte, wakeup chan<- bool) bool {
5119	testLogOutput.mu.Lock()
5120	defer testLogOutput.mu.Unlock()
5121	fw, ok := testLogOutput.w.(*filterWriter)
5122	if !ok {
5123		return false
5124	}
5125	fw.mu.Lock()
5126	defer fw.mu.Unlock()
5127	if bytes.Contains(fw.buf.Bytes(), v) {
5128		return true
5129	}
5130	fw.wakeup = wakeup
5131	return false
5132}
5133
5134var verboseLogs = flag.Bool("verbose_logs", false, "show all grpclog output, without filtering")
5135
5136func noop() {}
5137
5138// declareLogNoise declares that t is expected to emit the following noisy phrases,
5139// even on success. Those phrases will be filtered from grpclog output
5140// and only be shown if *verbose_logs or t ends up failing.
5141// The returned restore function should be called with defer to be run
5142// before the test ends.
5143func declareLogNoise(t *testing.T, phrases ...string) (restore func()) {
5144	if *verboseLogs {
5145		return noop
5146	}
5147	fw := &filterWriter{dst: os.Stderr, filter: phrases}
5148	testLogOutput.setWriter(fw)
5149	return func() {
5150		if t.Failed() {
5151			fw.mu.Lock()
5152			defer fw.mu.Unlock()
5153			if fw.buf.Len() > 0 {
5154				t.Logf("Complete log output:\n%s", fw.buf.Bytes())
5155			}
5156		}
5157		testLogOutput.setWriter(os.Stderr)
5158	}
5159}
5160
5161type filterWriter struct {
5162	dst    io.Writer
5163	filter []string
5164
5165	mu     sync.Mutex
5166	buf    bytes.Buffer
5167	wakeup chan<- bool // if non-nil, gets true on write
5168}
5169
5170func (fw *filterWriter) Write(p []byte) (n int, err error) {
5171	fw.mu.Lock()
5172	fw.buf.Write(p)
5173	if fw.wakeup != nil {
5174		select {
5175		case fw.wakeup <- true:
5176		default:
5177		}
5178	}
5179	fw.mu.Unlock()
5180
5181	ps := string(p)
5182	for _, f := range fw.filter {
5183		if strings.Contains(ps, f) {
5184			return len(p), nil
5185		}
5186	}
5187	return fw.dst.Write(p)
5188}
5189
5190// stubServer is a server that is easy to customize within individual test
5191// cases.
5192type stubServer struct {
5193	// Guarantees we satisfy this interface; panics if unimplemented methods are called.
5194	testpb.TestServiceServer
5195
5196	// Customizable implementations of server handlers.
5197	emptyCall      func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
5198	fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error
5199
5200	// A client connected to this service the test may use.  Created in Start().
5201	client testpb.TestServiceClient
5202	cc     *grpc.ClientConn
5203
5204	cleanups []func() // Lambdas executed in Stop(); populated by Start().
5205
5206	r *manual.Resolver
5207}
5208
5209func (ss *stubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5210	return ss.emptyCall(ctx, in)
5211}
5212
5213func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
5214	return ss.fullDuplexCall(stream)
5215}
5216
5217// Start starts the server and creates a client connected to it.
5218func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error {
5219	r, cleanup := manual.GenerateAndRegisterManualResolver()
5220	ss.r = r
5221	ss.cleanups = append(ss.cleanups, cleanup)
5222
5223	lis, err := net.Listen("tcp", "localhost:0")
5224	if err != nil {
5225		return fmt.Errorf(`net.Listen("tcp", "localhost:0") = %v`, err)
5226	}
5227	ss.cleanups = append(ss.cleanups, func() { lis.Close() })
5228
5229	s := grpc.NewServer(sopts...)
5230	testpb.RegisterTestServiceServer(s, ss)
5231	go s.Serve(lis)
5232	ss.cleanups = append(ss.cleanups, s.Stop)
5233
5234	target := ss.r.Scheme() + ":///" + lis.Addr().String()
5235
5236	opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...)
5237	cc, err := grpc.Dial(target, opts...)
5238	if err != nil {
5239		return fmt.Errorf("grpc.Dial(%q) = %v", target, err)
5240	}
5241	ss.cc = cc
5242	ss.r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
5243	if err := ss.waitForReady(cc); err != nil {
5244		return err
5245	}
5246
5247	ss.cleanups = append(ss.cleanups, func() { cc.Close() })
5248
5249	ss.client = testpb.NewTestServiceClient(cc)
5250	return nil
5251}
5252
5253func (ss *stubServer) waitForReady(cc *grpc.ClientConn) error {
5254	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
5255	defer cancel()
5256	for {
5257		s := cc.GetState()
5258		if s == connectivity.Ready {
5259			return nil
5260		}
5261		if !cc.WaitForStateChange(ctx, s) {
5262			// ctx got timeout or canceled.
5263			return ctx.Err()
5264		}
5265	}
5266}
5267
5268func (ss *stubServer) Stop() {
5269	for i := len(ss.cleanups) - 1; i >= 0; i-- {
5270		ss.cleanups[i]()
5271	}
5272}
5273
5274func TestGRPCMethod(t *testing.T) {
5275	defer leakcheck.Check(t)
5276	var method string
5277	var ok bool
5278
5279	ss := &stubServer{
5280		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5281			method, ok = grpc.Method(ctx)
5282			return &testpb.Empty{}, nil
5283		},
5284	}
5285	if err := ss.Start(nil); err != nil {
5286		t.Fatalf("Error starting endpoint server: %v", err)
5287	}
5288	defer ss.Stop()
5289
5290	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5291	defer cancel()
5292
5293	if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
5294		t.Fatalf("ss.client.EmptyCall(_, _) = _, %v; want _, nil", err)
5295	}
5296
5297	if want := "/grpc.testing.TestService/EmptyCall"; !ok || method != want {
5298		t.Fatalf("grpc.Method(_) = %q, %v; want %q, true", method, ok, want)
5299	}
5300}
5301
5302func TestUnaryProxyDoesNotForwardMetadata(t *testing.T) {
5303	const mdkey = "somedata"
5304
5305	// endpoint ensures mdkey is NOT in metadata and returns an error if it is.
5306	endpoint := &stubServer{
5307		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5308			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
5309				return nil, status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
5310			}
5311			return &testpb.Empty{}, nil
5312		},
5313	}
5314	if err := endpoint.Start(nil); err != nil {
5315		t.Fatalf("Error starting endpoint server: %v", err)
5316	}
5317	defer endpoint.Stop()
5318
5319	// proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint
5320	// without explicitly copying the metadata.
5321	proxy := &stubServer{
5322		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5323			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
5324				return nil, status.Errorf(codes.Internal, "proxy: md=%v; want contains(%q)", md, mdkey)
5325			}
5326			return endpoint.client.EmptyCall(ctx, in)
5327		},
5328	}
5329	if err := proxy.Start(nil); err != nil {
5330		t.Fatalf("Error starting proxy server: %v", err)
5331	}
5332	defer proxy.Stop()
5333
5334	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5335	defer cancel()
5336	md := metadata.Pairs(mdkey, "val")
5337	ctx = metadata.NewOutgoingContext(ctx, md)
5338
5339	// Sanity check that endpoint properly errors when it sees mdkey.
5340	_, err := endpoint.client.EmptyCall(ctx, &testpb.Empty{})
5341	if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
5342		t.Fatalf("endpoint.client.EmptyCall(_, _) = _, %v; want _, <status with Code()=Internal>", err)
5343	}
5344
5345	if _, err := proxy.client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
5346		t.Fatal(err.Error())
5347	}
5348}
5349
5350func TestStreamingProxyDoesNotForwardMetadata(t *testing.T) {
5351	const mdkey = "somedata"
5352
5353	// doFDC performs a FullDuplexCall with client and returns the error from the
5354	// first stream.Recv call, or nil if that error is io.EOF.  Calls t.Fatal if
5355	// the stream cannot be established.
5356	doFDC := func(ctx context.Context, client testpb.TestServiceClient) error {
5357		stream, err := client.FullDuplexCall(ctx)
5358		if err != nil {
5359			t.Fatalf("Unwanted error: %v", err)
5360		}
5361		if _, err := stream.Recv(); err != io.EOF {
5362			return err
5363		}
5364		return nil
5365	}
5366
5367	// endpoint ensures mdkey is NOT in metadata and returns an error if it is.
5368	endpoint := &stubServer{
5369		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
5370			ctx := stream.Context()
5371			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
5372				return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
5373			}
5374			return nil
5375		},
5376	}
5377	if err := endpoint.Start(nil); err != nil {
5378		t.Fatalf("Error starting endpoint server: %v", err)
5379	}
5380	defer endpoint.Stop()
5381
5382	// proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint
5383	// without explicitly copying the metadata.
5384	proxy := &stubServer{
5385		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
5386			ctx := stream.Context()
5387			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
5388				return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
5389			}
5390			return doFDC(ctx, endpoint.client)
5391		},
5392	}
5393	if err := proxy.Start(nil); err != nil {
5394		t.Fatalf("Error starting proxy server: %v", err)
5395	}
5396	defer proxy.Stop()
5397
5398	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5399	defer cancel()
5400	md := metadata.Pairs(mdkey, "val")
5401	ctx = metadata.NewOutgoingContext(ctx, md)
5402
5403	// Sanity check that endpoint properly errors when it sees mdkey in ctx.
5404	err := doFDC(ctx, endpoint.client)
5405	if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
5406		t.Fatalf("stream.Recv() = _, %v; want _, <status with Code()=Internal>", err)
5407	}
5408
5409	if err := doFDC(ctx, proxy.client); err != nil {
5410		t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err)
5411	}
5412}
5413
5414func TestStatsTagsAndTrace(t *testing.T) {
5415	// Data added to context by client (typically in a stats handler).
5416	tags := []byte{1, 5, 2, 4, 3}
5417	trace := []byte{5, 2, 1, 3, 4}
5418
5419	// endpoint ensures Tags() and Trace() in context match those that were added
5420	// by the client and returns an error if not.
5421	endpoint := &stubServer{
5422		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5423			md, _ := metadata.FromIncomingContext(ctx)
5424			if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) {
5425				return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags)
5426			}
5427			if !reflect.DeepEqual(md["grpc-tags-bin"], []string{string(tags)}) {
5428				return nil, status.Errorf(codes.Internal, "md['grpc-tags-bin']=%v; want %v", md["grpc-tags-bin"], tags)
5429			}
5430			if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, trace) {
5431				return nil, status.Errorf(codes.Internal, "stats.Trace(%v)=%v; want %v", ctx, tr, trace)
5432			}
5433			if !reflect.DeepEqual(md["grpc-trace-bin"], []string{string(trace)}) {
5434				return nil, status.Errorf(codes.Internal, "md['grpc-trace-bin']=%v; want %v", md["grpc-trace-bin"], trace)
5435			}
5436			return &testpb.Empty{}, nil
5437		},
5438	}
5439	if err := endpoint.Start(nil); err != nil {
5440		t.Fatalf("Error starting endpoint server: %v", err)
5441	}
5442	defer endpoint.Stop()
5443
5444	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5445	defer cancel()
5446
5447	testCases := []struct {
5448		ctx  context.Context
5449		want codes.Code
5450	}{
5451		{ctx: ctx, want: codes.Internal},
5452		{ctx: stats.SetTags(ctx, tags), want: codes.Internal},
5453		{ctx: stats.SetTrace(ctx, trace), want: codes.Internal},
5454		{ctx: stats.SetTags(stats.SetTrace(ctx, tags), tags), want: codes.Internal},
5455		{ctx: stats.SetTags(stats.SetTrace(ctx, trace), tags), want: codes.OK},
5456	}
5457
5458	for _, tc := range testCases {
5459		_, err := endpoint.client.EmptyCall(tc.ctx, &testpb.Empty{})
5460		if tc.want == codes.OK && err != nil {
5461			t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err)
5462		}
5463		if s, ok := status.FromError(err); !ok || s.Code() != tc.want {
5464			t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, <status with Code()=%v>", tc.ctx, err, tc.want)
5465		}
5466	}
5467}
5468
5469func TestTapTimeout(t *testing.T) {
5470	sopts := []grpc.ServerOption{
5471		grpc.InTapHandle(func(ctx context.Context, _ *tap.Info) (context.Context, error) {
5472			c, cancel := context.WithCancel(ctx)
5473			// Call cancel instead of setting a deadline so we can detect which error
5474			// occurred -- this cancellation (desired) or the client's deadline
5475			// expired (indicating this cancellation did not affect the RPC).
5476			time.AfterFunc(10*time.Millisecond, cancel)
5477			return c, nil
5478		}),
5479	}
5480
5481	ss := &stubServer{
5482		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5483			<-ctx.Done()
5484			return nil, status.Errorf(codes.Canceled, ctx.Err().Error())
5485		},
5486	}
5487	if err := ss.Start(sopts); err != nil {
5488		t.Fatalf("Error starting endpoint server: %v", err)
5489	}
5490	defer ss.Stop()
5491
5492	// This was known to be flaky; test several times.
5493	for i := 0; i < 10; i++ {
5494		// Set our own deadline in case the server hangs.
5495		ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5496		res, err := ss.client.EmptyCall(ctx, &testpb.Empty{})
5497		cancel()
5498		if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled {
5499			t.Fatalf("ss.client.EmptyCall(context.Background(), _) = %v, %v; want nil, <status with Code()=Canceled>", res, err)
5500		}
5501	}
5502
5503}
5504
5505func TestClientWriteFailsAfterServerClosesStream(t *testing.T) {
5506	ss := &stubServer{
5507		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
5508			return status.Errorf(codes.Internal, "")
5509		},
5510	}
5511	sopts := []grpc.ServerOption{}
5512	if err := ss.Start(sopts); err != nil {
5513		t.Fatalf("Error starting endpoint server: %v", err)
5514	}
5515	defer ss.Stop()
5516	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
5517	defer cancel()
5518	stream, err := ss.client.FullDuplexCall(ctx)
5519	if err != nil {
5520		t.Fatalf("Error while creating stream: %v", err)
5521	}
5522	for {
5523		if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err == nil {
5524			time.Sleep(5 * time.Millisecond)
5525		} else if err == io.EOF {
5526			break // Success.
5527		} else {
5528			t.Fatalf("stream.Send(_) = %v, want io.EOF", err)
5529		}
5530	}
5531}
5532
5533type windowSizeConfig struct {
5534	serverStream int32
5535	serverConn   int32
5536	clientStream int32
5537	clientConn   int32
5538}
5539
5540func max(a, b int32) int32 {
5541	if a > b {
5542		return a
5543	}
5544	return b
5545}
5546
5547func TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
5548	defer leakcheck.Check(t)
5549	wc := windowSizeConfig{
5550		serverStream: 8 * 1024 * 1024,
5551		serverConn:   12 * 1024 * 1024,
5552		clientStream: 6 * 1024 * 1024,
5553		clientConn:   8 * 1024 * 1024,
5554	}
5555	for _, e := range listTestEnv() {
5556		testConfigurableWindowSize(t, e, wc)
5557	}
5558}
5559
5560func TestConfigurableWindowSizeWithSmallWindow(t *testing.T) {
5561	defer leakcheck.Check(t)
5562	wc := windowSizeConfig{
5563		serverStream: 1,
5564		serverConn:   1,
5565		clientStream: 1,
5566		clientConn:   1,
5567	}
5568	for _, e := range listTestEnv() {
5569		testConfigurableWindowSize(t, e, wc)
5570	}
5571}
5572
5573func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
5574	te := newTest(t, e)
5575	te.serverInitialWindowSize = wc.serverStream
5576	te.serverInitialConnWindowSize = wc.serverConn
5577	te.clientInitialWindowSize = wc.clientStream
5578	te.clientInitialConnWindowSize = wc.clientConn
5579
5580	te.startServer(&testServer{security: e.security})
5581	defer te.tearDown()
5582
5583	cc := te.clientConn()
5584	tc := testpb.NewTestServiceClient(cc)
5585	stream, err := tc.FullDuplexCall(context.Background())
5586	if err != nil {
5587		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5588	}
5589	numOfIter := 11
5590	// Set message size to exhaust largest of window sizes.
5591	messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1)
5592	messageSize = max(messageSize, 64*1024)
5593	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize)
5594	if err != nil {
5595		t.Fatal(err)
5596	}
5597	respParams := []*testpb.ResponseParameters{
5598		{
5599			Size: messageSize,
5600		},
5601	}
5602	req := &testpb.StreamingOutputCallRequest{
5603		ResponseType:       testpb.PayloadType_COMPRESSABLE,
5604		ResponseParameters: respParams,
5605		Payload:            payload,
5606	}
5607	for i := 0; i < numOfIter; i++ {
5608		if err := stream.Send(req); err != nil {
5609			t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
5610		}
5611		if _, err := stream.Recv(); err != nil {
5612			t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
5613		}
5614	}
5615	if err := stream.CloseSend(); err != nil {
5616		t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
5617	}
5618}
5619
5620var (
5621	// test authdata
5622	authdata = map[string]string{
5623		"test-key":      "test-value",
5624		"test-key2-bin": string([]byte{1, 2, 3}),
5625	}
5626)
5627
5628type testPerRPCCredentials struct{}
5629
5630func (cr testPerRPCCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
5631	return authdata, nil
5632}
5633
5634func (cr testPerRPCCredentials) RequireTransportSecurity() bool {
5635	return false
5636}
5637
5638func authHandle(ctx context.Context, info *tap.Info) (context.Context, error) {
5639	md, ok := metadata.FromIncomingContext(ctx)
5640	if !ok {
5641		return ctx, fmt.Errorf("didn't find metadata in context")
5642	}
5643	for k, vwant := range authdata {
5644		vgot, ok := md[k]
5645		if !ok {
5646			return ctx, fmt.Errorf("didn't find authdata key %v in context", k)
5647		}
5648		if vgot[0] != vwant {
5649			return ctx, fmt.Errorf("for key %v, got value %v, want %v", k, vgot, vwant)
5650		}
5651	}
5652	return ctx, nil
5653}
5654
5655func TestPerRPCCredentialsViaDialOptions(t *testing.T) {
5656	defer leakcheck.Check(t)
5657	for _, e := range listTestEnv() {
5658		testPerRPCCredentialsViaDialOptions(t, e)
5659	}
5660}
5661
5662func testPerRPCCredentialsViaDialOptions(t *testing.T, e env) {
5663	te := newTest(t, e)
5664	te.tapHandle = authHandle
5665	te.perRPCCreds = testPerRPCCredentials{}
5666	te.startServer(&testServer{security: e.security})
5667	defer te.tearDown()
5668
5669	cc := te.clientConn()
5670	tc := testpb.NewTestServiceClient(cc)
5671	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
5672		t.Fatalf("Test failed. Reason: %v", err)
5673	}
5674}
5675
5676func TestPerRPCCredentialsViaCallOptions(t *testing.T) {
5677	defer leakcheck.Check(t)
5678	for _, e := range listTestEnv() {
5679		testPerRPCCredentialsViaCallOptions(t, e)
5680	}
5681}
5682
5683func testPerRPCCredentialsViaCallOptions(t *testing.T, e env) {
5684	te := newTest(t, e)
5685	te.tapHandle = authHandle
5686	te.startServer(&testServer{security: e.security})
5687	defer te.tearDown()
5688
5689	cc := te.clientConn()
5690	tc := testpb.NewTestServiceClient(cc)
5691	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil {
5692		t.Fatalf("Test failed. Reason: %v", err)
5693	}
5694}
5695
5696func TestPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T) {
5697	defer leakcheck.Check(t)
5698	for _, e := range listTestEnv() {
5699		testPerRPCCredentialsViaDialOptionsAndCallOptions(t, e)
5700	}
5701}
5702
5703func testPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T, e env) {
5704	te := newTest(t, e)
5705	te.perRPCCreds = testPerRPCCredentials{}
5706	// When credentials are provided via both dial options and call options,
5707	// we apply both sets.
5708	te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) {
5709		md, ok := metadata.FromIncomingContext(ctx)
5710		if !ok {
5711			return ctx, fmt.Errorf("couldn't find metadata in context")
5712		}
5713		for k, vwant := range authdata {
5714			vgot, ok := md[k]
5715			if !ok {
5716				return ctx, fmt.Errorf("couldn't find metadata for key %v", k)
5717			}
5718			if len(vgot) != 2 {
5719				return ctx, fmt.Errorf("len of value for key %v was %v, want 2", k, len(vgot))
5720			}
5721			if vgot[0] != vwant || vgot[1] != vwant {
5722				return ctx, fmt.Errorf("value for %v was %v, want [%v, %v]", k, vgot, vwant, vwant)
5723			}
5724		}
5725		return ctx, nil
5726	}
5727	te.startServer(&testServer{security: e.security})
5728	defer te.tearDown()
5729
5730	cc := te.clientConn()
5731	tc := testpb.NewTestServiceClient(cc)
5732	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil {
5733		t.Fatalf("Test failed. Reason: %v", err)
5734	}
5735}
5736
5737func TestWaitForReadyConnection(t *testing.T) {
5738	defer leakcheck.Check(t)
5739	for _, e := range listTestEnv() {
5740		testWaitForReadyConnection(t, e)
5741	}
5742
5743}
5744
5745func testWaitForReadyConnection(t *testing.T, e env) {
5746	te := newTest(t, e)
5747	te.userAgent = testAppUA
5748	te.startServer(&testServer{security: e.security})
5749	defer te.tearDown()
5750
5751	cc := te.clientConn() // Non-blocking dial.
5752	tc := testpb.NewTestServiceClient(cc)
5753	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
5754	defer cancel()
5755	state := cc.GetState()
5756	// Wait for connection to be Ready.
5757	for ; state != connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
5758	}
5759	if state != connectivity.Ready {
5760		t.Fatalf("Want connection state to be Ready, got %v", state)
5761	}
5762	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
5763	defer cancel()
5764	// Make a fail-fast RPC.
5765	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
5766		t.Fatalf("TestService/EmptyCall(_,_) = _, %v, want _, nil", err)
5767	}
5768}
5769
5770type errCodec struct {
5771	noError bool
5772}
5773
5774func (c *errCodec) Marshal(v interface{}) ([]byte, error) {
5775	if c.noError {
5776		return []byte{}, nil
5777	}
5778	return nil, fmt.Errorf("3987^12 + 4365^12 = 4472^12")
5779}
5780
5781func (c *errCodec) Unmarshal(data []byte, v interface{}) error {
5782	return nil
5783}
5784
5785func (c *errCodec) String() string {
5786	return "Fermat's near-miss."
5787}
5788
5789func TestEncodeDoesntPanic(t *testing.T) {
5790	defer leakcheck.Check(t)
5791	for _, e := range listTestEnv() {
5792		testEncodeDoesntPanic(t, e)
5793	}
5794}
5795
5796func testEncodeDoesntPanic(t *testing.T, e env) {
5797	te := newTest(t, e)
5798	erc := &errCodec{}
5799	te.customCodec = erc
5800	te.startServer(&testServer{security: e.security})
5801	defer te.tearDown()
5802	te.customCodec = nil
5803	tc := testpb.NewTestServiceClient(te.clientConn())
5804	// Failure case, should not panic.
5805	tc.EmptyCall(context.Background(), &testpb.Empty{})
5806	erc.noError = true
5807	// Passing case.
5808	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
5809		t.Fatalf("EmptyCall(_, _) = _, %v, want _, <nil>", err)
5810	}
5811}
5812
5813func TestSvrWriteStatusEarlyWrite(t *testing.T) {
5814	defer leakcheck.Check(t)
5815	for _, e := range listTestEnv() {
5816		testSvrWriteStatusEarlyWrite(t, e)
5817	}
5818}
5819
5820func testSvrWriteStatusEarlyWrite(t *testing.T, e env) {
5821	te := newTest(t, e)
5822	const smallSize = 1024
5823	const largeSize = 2048
5824	const extraLargeSize = 4096
5825	te.maxServerReceiveMsgSize = newInt(largeSize)
5826	te.maxServerSendMsgSize = newInt(largeSize)
5827	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
5828	if err != nil {
5829		t.Fatal(err)
5830	}
5831	extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
5832	if err != nil {
5833		t.Fatal(err)
5834	}
5835	te.startServer(&testServer{security: e.security})
5836	defer te.tearDown()
5837	tc := testpb.NewTestServiceClient(te.clientConn())
5838	respParam := []*testpb.ResponseParameters{
5839		{
5840			Size: int32(smallSize),
5841		},
5842	}
5843	sreq := &testpb.StreamingOutputCallRequest{
5844		ResponseType:       testpb.PayloadType_COMPRESSABLE,
5845		ResponseParameters: respParam,
5846		Payload:            extraLargePayload,
5847	}
5848	// Test recv case: server receives a message larger than maxServerReceiveMsgSize.
5849	stream, err := tc.FullDuplexCall(te.ctx)
5850	if err != nil {
5851		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5852	}
5853	if err = stream.Send(sreq); err != nil {
5854		t.Fatalf("%v.Send() = _, %v, want <nil>", stream, err)
5855	}
5856	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
5857		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5858	}
5859	// Test send case: server sends a message larger than maxServerSendMsgSize.
5860	sreq.Payload = smallPayload
5861	respParam[0].Size = int32(extraLargeSize)
5862
5863	stream, err = tc.FullDuplexCall(te.ctx)
5864	if err != nil {
5865		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5866	}
5867	if err = stream.Send(sreq); err != nil {
5868		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5869	}
5870	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
5871		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5872	}
5873}
5874
5875// The following functions with function name ending with TD indicates that they
5876// should be deleted after old service config API is deprecated and deleted.
5877func testServiceConfigSetupTD(t *testing.T, e env) (*test, chan grpc.ServiceConfig) {
5878	te := newTest(t, e)
5879	// We write before read.
5880	ch := make(chan grpc.ServiceConfig, 1)
5881	te.sc = ch
5882	te.userAgent = testAppUA
5883	te.declareLogNoise(
5884		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
5885		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
5886		"grpc: addrConn.resetTransport failed to create client transport: connection error",
5887		"Failed to dial : context canceled; please retry.",
5888	)
5889	return te, ch
5890}
5891
5892func TestServiceConfigGetMethodConfigTD(t *testing.T) {
5893	defer leakcheck.Check(t)
5894	for _, e := range listTestEnv() {
5895		testGetMethodConfigTD(t, e)
5896	}
5897}
5898
5899func testGetMethodConfigTD(t *testing.T, e env) {
5900	te, ch := testServiceConfigSetupTD(t, e)
5901	defer te.tearDown()
5902
5903	mc1 := grpc.MethodConfig{
5904		WaitForReady: newBool(true),
5905		Timeout:      newDuration(time.Millisecond),
5906	}
5907	mc2 := grpc.MethodConfig{WaitForReady: newBool(false)}
5908	m := make(map[string]grpc.MethodConfig)
5909	m["/grpc.testing.TestService/EmptyCall"] = mc1
5910	m["/grpc.testing.TestService/"] = mc2
5911	sc := grpc.ServiceConfig{
5912		Methods: m,
5913	}
5914	ch <- sc
5915
5916	cc := te.clientConn()
5917	tc := testpb.NewTestServiceClient(cc)
5918	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5919	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
5920		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5921	}
5922
5923	m = make(map[string]grpc.MethodConfig)
5924	m["/grpc.testing.TestService/UnaryCall"] = mc1
5925	m["/grpc.testing.TestService/"] = mc2
5926	sc = grpc.ServiceConfig{
5927		Methods: m,
5928	}
5929	ch <- sc
5930	// Wait for the new service config to propagate.
5931	for {
5932		if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded {
5933			continue
5934		}
5935		break
5936	}
5937	// The following RPCs are expected to become fail-fast.
5938	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable {
5939		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
5940	}
5941}
5942
5943func TestServiceConfigWaitForReadyTD(t *testing.T) {
5944	defer leakcheck.Check(t)
5945	for _, e := range listTestEnv() {
5946		testServiceConfigWaitForReadyTD(t, e)
5947	}
5948}
5949
5950func testServiceConfigWaitForReadyTD(t *testing.T, e env) {
5951	te, ch := testServiceConfigSetupTD(t, e)
5952	defer te.tearDown()
5953
5954	// Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
5955	mc := grpc.MethodConfig{
5956		WaitForReady: newBool(false),
5957		Timeout:      newDuration(time.Millisecond),
5958	}
5959	m := make(map[string]grpc.MethodConfig)
5960	m["/grpc.testing.TestService/EmptyCall"] = mc
5961	m["/grpc.testing.TestService/FullDuplexCall"] = mc
5962	sc := grpc.ServiceConfig{
5963		Methods: m,
5964	}
5965	ch <- sc
5966
5967	cc := te.clientConn()
5968	tc := testpb.NewTestServiceClient(cc)
5969	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5970	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
5971		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5972	}
5973	if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
5974		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5975	}
5976
5977	// Generate a service config update.
5978	// Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
5979	mc.WaitForReady = newBool(true)
5980	m = make(map[string]grpc.MethodConfig)
5981	m["/grpc.testing.TestService/EmptyCall"] = mc
5982	m["/grpc.testing.TestService/FullDuplexCall"] = mc
5983	sc = grpc.ServiceConfig{
5984		Methods: m,
5985	}
5986	ch <- sc
5987
5988	// Wait for the new service config to take effect.
5989	mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
5990	for {
5991		if !*mc.WaitForReady {
5992			time.Sleep(100 * time.Millisecond)
5993			mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
5994			continue
5995		}
5996		break
5997	}
5998	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5999	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
6000		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
6001	}
6002	if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded {
6003		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
6004	}
6005}
6006
6007func TestServiceConfigTimeoutTD(t *testing.T) {
6008	defer leakcheck.Check(t)
6009	for _, e := range listTestEnv() {
6010		testServiceConfigTimeoutTD(t, e)
6011	}
6012}
6013
6014func testServiceConfigTimeoutTD(t *testing.T, e env) {
6015	te, ch := testServiceConfigSetupTD(t, e)
6016	defer te.tearDown()
6017
6018	// Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
6019	mc := grpc.MethodConfig{
6020		Timeout: newDuration(time.Hour),
6021	}
6022	m := make(map[string]grpc.MethodConfig)
6023	m["/grpc.testing.TestService/EmptyCall"] = mc
6024	m["/grpc.testing.TestService/FullDuplexCall"] = mc
6025	sc := grpc.ServiceConfig{
6026		Methods: m,
6027	}
6028	ch <- sc
6029
6030	cc := te.clientConn()
6031	tc := testpb.NewTestServiceClient(cc)
6032	// The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
6033	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
6034	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
6035		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
6036	}
6037	cancel()
6038	ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond)
6039	if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
6040		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
6041	}
6042	cancel()
6043
6044	// Generate a service config update.
6045	// Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
6046	mc.Timeout = newDuration(time.Nanosecond)
6047	m = make(map[string]grpc.MethodConfig)
6048	m["/grpc.testing.TestService/EmptyCall"] = mc
6049	m["/grpc.testing.TestService/FullDuplexCall"] = mc
6050	sc = grpc.ServiceConfig{
6051		Methods: m,
6052	}
6053	ch <- sc
6054
6055	// Wait for the new service config to take effect.
6056	mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
6057	for {
6058		if *mc.Timeout != time.Nanosecond {
6059			time.Sleep(100 * time.Millisecond)
6060			mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
6061			continue
6062		}
6063		break
6064	}
6065
6066	ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
6067	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
6068		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
6069	}
6070	cancel()
6071
6072	ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
6073	if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); status.Code(err) != codes.DeadlineExceeded {
6074		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
6075	}
6076	cancel()
6077}
6078
6079func TestServiceConfigMaxMsgSizeTD(t *testing.T) {
6080	defer leakcheck.Check(t)
6081	for _, e := range listTestEnv() {
6082		testServiceConfigMaxMsgSizeTD(t, e)
6083	}
6084}
6085
6086func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) {
6087	// Setting up values and objects shared across all test cases.
6088	const smallSize = 1
6089	const largeSize = 1024
6090	const extraLargeSize = 2048
6091
6092	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
6093	if err != nil {
6094		t.Fatal(err)
6095	}
6096	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
6097	if err != nil {
6098		t.Fatal(err)
6099	}
6100	extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
6101	if err != nil {
6102		t.Fatal(err)
6103	}
6104
6105	mc := grpc.MethodConfig{
6106		MaxReqSize:  newInt(extraLargeSize),
6107		MaxRespSize: newInt(extraLargeSize),
6108	}
6109
6110	m := make(map[string]grpc.MethodConfig)
6111	m["/grpc.testing.TestService/UnaryCall"] = mc
6112	m["/grpc.testing.TestService/FullDuplexCall"] = mc
6113	sc := grpc.ServiceConfig{
6114		Methods: m,
6115	}
6116	// Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
6117	te1, ch1 := testServiceConfigSetupTD(t, e)
6118	te1.startServer(&testServer{security: e.security})
6119	defer te1.tearDown()
6120
6121	ch1 <- sc
6122	tc := testpb.NewTestServiceClient(te1.clientConn())
6123
6124	req := &testpb.SimpleRequest{
6125		ResponseType: testpb.PayloadType_COMPRESSABLE,
6126		ResponseSize: int32(extraLargeSize),
6127		Payload:      smallPayload,
6128	}
6129	// Test for unary RPC recv.
6130	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
6131		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
6132	}
6133
6134	// Test for unary RPC send.
6135	req.Payload = extraLargePayload
6136	req.ResponseSize = int32(smallSize)
6137	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
6138		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
6139	}
6140
6141	// Test for streaming RPC recv.
6142	respParam := []*testpb.ResponseParameters{
6143		{
6144			Size: int32(extraLargeSize),
6145		},
6146	}
6147	sreq := &testpb.StreamingOutputCallRequest{
6148		ResponseType:       testpb.PayloadType_COMPRESSABLE,
6149		ResponseParameters: respParam,
6150		Payload:            smallPayload,
6151	}
6152	stream, err := tc.FullDuplexCall(te1.ctx)
6153	if err != nil {
6154		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6155	}
6156	if err := stream.Send(sreq); err != nil {
6157		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6158	}
6159	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
6160		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
6161	}
6162
6163	// Test for streaming RPC send.
6164	respParam[0].Size = int32(smallSize)
6165	sreq.Payload = extraLargePayload
6166	stream, err = tc.FullDuplexCall(te1.ctx)
6167	if err != nil {
6168		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6169	}
6170	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
6171		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
6172	}
6173
6174	// Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
6175	te2, ch2 := testServiceConfigSetupTD(t, e)
6176	te2.maxClientReceiveMsgSize = newInt(1024)
6177	te2.maxClientSendMsgSize = newInt(1024)
6178	te2.startServer(&testServer{security: e.security})
6179	defer te2.tearDown()
6180	ch2 <- sc
6181	tc = testpb.NewTestServiceClient(te2.clientConn())
6182
6183	// Test for unary RPC recv.
6184	req.Payload = smallPayload
6185	req.ResponseSize = int32(largeSize)
6186
6187	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
6188		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
6189	}
6190
6191	// Test for unary RPC send.
6192	req.Payload = largePayload
6193	req.ResponseSize = int32(smallSize)
6194	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
6195		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
6196	}
6197
6198	// Test for streaming RPC recv.
6199	stream, err = tc.FullDuplexCall(te2.ctx)
6200	respParam[0].Size = int32(largeSize)
6201	sreq.Payload = smallPayload
6202	if err != nil {
6203		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6204	}
6205	if err := stream.Send(sreq); err != nil {
6206		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6207	}
6208	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
6209		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
6210	}
6211
6212	// Test for streaming RPC send.
6213	respParam[0].Size = int32(smallSize)
6214	sreq.Payload = largePayload
6215	stream, err = tc.FullDuplexCall(te2.ctx)
6216	if err != nil {
6217		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6218	}
6219	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
6220		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
6221	}
6222
6223	// Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
6224	te3, ch3 := testServiceConfigSetupTD(t, e)
6225	te3.maxClientReceiveMsgSize = newInt(4096)
6226	te3.maxClientSendMsgSize = newInt(4096)
6227	te3.startServer(&testServer{security: e.security})
6228	defer te3.tearDown()
6229	ch3 <- sc
6230	tc = testpb.NewTestServiceClient(te3.clientConn())
6231
6232	// Test for unary RPC recv.
6233	req.Payload = smallPayload
6234	req.ResponseSize = int32(largeSize)
6235
6236	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
6237		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
6238	}
6239
6240	req.ResponseSize = int32(extraLargeSize)
6241	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
6242		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
6243	}
6244
6245	// Test for unary RPC send.
6246	req.Payload = largePayload
6247	req.ResponseSize = int32(smallSize)
6248	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
6249		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
6250	}
6251
6252	req.Payload = extraLargePayload
6253	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
6254		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
6255	}
6256
6257	// Test for streaming RPC recv.
6258	stream, err = tc.FullDuplexCall(te3.ctx)
6259	if err != nil {
6260		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6261	}
6262	respParam[0].Size = int32(largeSize)
6263	sreq.Payload = smallPayload
6264
6265	if err := stream.Send(sreq); err != nil {
6266		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6267	}
6268	if _, err := stream.Recv(); err != nil {
6269		t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
6270	}
6271
6272	respParam[0].Size = int32(extraLargeSize)
6273
6274	if err := stream.Send(sreq); err != nil {
6275		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6276	}
6277	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
6278		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
6279	}
6280
6281	// Test for streaming RPC send.
6282	respParam[0].Size = int32(smallSize)
6283	sreq.Payload = largePayload
6284	stream, err = tc.FullDuplexCall(te3.ctx)
6285	if err != nil {
6286		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6287	}
6288	if err := stream.Send(sreq); err != nil {
6289		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6290	}
6291	sreq.Payload = extraLargePayload
6292	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
6293		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
6294	}
6295}
6296
6297func TestMethodFromServerStream(t *testing.T) {
6298	defer leakcheck.Check(t)
6299	const testMethod = "/package.service/method"
6300	e := tcpClearRREnv
6301	te := newTest(t, e)
6302	var method string
6303	var ok bool
6304	te.unknownHandler = func(srv interface{}, stream grpc.ServerStream) error {
6305		method, ok = grpc.MethodFromServerStream(stream)
6306		return nil
6307	}
6308
6309	te.startServer(nil)
6310	defer te.tearDown()
6311	_ = te.clientConn().Invoke(context.Background(), testMethod, nil, nil)
6312	if !ok || method != testMethod {
6313		t.Fatalf("Invoke with method %q, got %q, %v, want %q, true", testMethod, method, ok, testMethod)
6314	}
6315}
6316
6317func TestInterceptorCanAccessCallOptions(t *testing.T) {
6318	defer leakcheck.Check(t)
6319	e := tcpClearRREnv
6320	te := newTest(t, e)
6321	te.startServer(&testServer{security: e.security})
6322	defer te.tearDown()
6323
6324	type observedOptions struct {
6325		headers     []*metadata.MD
6326		trailers    []*metadata.MD
6327		peer        []*peer.Peer
6328		creds       []credentials.PerRPCCredentials
6329		failFast    []bool
6330		maxRecvSize []int
6331		maxSendSize []int
6332		compressor  []string
6333		subtype     []string
6334	}
6335	var observedOpts observedOptions
6336	populateOpts := func(opts []grpc.CallOption) {
6337		for _, o := range opts {
6338			switch o := o.(type) {
6339			case grpc.HeaderCallOption:
6340				observedOpts.headers = append(observedOpts.headers, o.HeaderAddr)
6341			case grpc.TrailerCallOption:
6342				observedOpts.trailers = append(observedOpts.trailers, o.TrailerAddr)
6343			case grpc.PeerCallOption:
6344				observedOpts.peer = append(observedOpts.peer, o.PeerAddr)
6345			case grpc.PerRPCCredsCallOption:
6346				observedOpts.creds = append(observedOpts.creds, o.Creds)
6347			case grpc.FailFastCallOption:
6348				observedOpts.failFast = append(observedOpts.failFast, o.FailFast)
6349			case grpc.MaxRecvMsgSizeCallOption:
6350				observedOpts.maxRecvSize = append(observedOpts.maxRecvSize, o.MaxRecvMsgSize)
6351			case grpc.MaxSendMsgSizeCallOption:
6352				observedOpts.maxSendSize = append(observedOpts.maxSendSize, o.MaxSendMsgSize)
6353			case grpc.CompressorCallOption:
6354				observedOpts.compressor = append(observedOpts.compressor, o.CompressorType)
6355			case grpc.ContentSubtypeCallOption:
6356				observedOpts.subtype = append(observedOpts.subtype, o.ContentSubtype)
6357			}
6358		}
6359	}
6360
6361	te.unaryClientInt = func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
6362		populateOpts(opts)
6363		return nil
6364	}
6365	te.streamClientInt = func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
6366		populateOpts(opts)
6367		return nil, nil
6368	}
6369
6370	defaults := []grpc.CallOption{
6371		grpc.FailFast(false),
6372		grpc.MaxCallRecvMsgSize(1010),
6373	}
6374	tc := testpb.NewTestServiceClient(te.clientConn(grpc.WithDefaultCallOptions(defaults...)))
6375
6376	var headers metadata.MD
6377	var trailers metadata.MD
6378	var pr peer.Peer
6379	tc.UnaryCall(context.Background(), &testpb.SimpleRequest{},
6380		grpc.MaxCallRecvMsgSize(100),
6381		grpc.MaxCallSendMsgSize(200),
6382		grpc.PerRPCCredentials(testPerRPCCredentials{}),
6383		grpc.Header(&headers),
6384		grpc.Trailer(&trailers),
6385		grpc.Peer(&pr))
6386	expected := observedOptions{
6387		failFast:    []bool{false},
6388		maxRecvSize: []int{1010, 100},
6389		maxSendSize: []int{200},
6390		creds:       []credentials.PerRPCCredentials{testPerRPCCredentials{}},
6391		headers:     []*metadata.MD{&headers},
6392		trailers:    []*metadata.MD{&trailers},
6393		peer:        []*peer.Peer{&pr},
6394	}
6395
6396	if !reflect.DeepEqual(expected, observedOpts) {
6397		t.Errorf("unary call did not observe expected options: expected %#v, got %#v", expected, observedOpts)
6398	}
6399
6400	observedOpts = observedOptions{} // reset
6401
6402	tc.StreamingInputCall(context.Background(),
6403		grpc.FailFast(true),
6404		grpc.MaxCallSendMsgSize(2020),
6405		grpc.UseCompressor("comp-type"),
6406		grpc.CallContentSubtype("json"))
6407	expected = observedOptions{
6408		failFast:    []bool{false, true},
6409		maxRecvSize: []int{1010},
6410		maxSendSize: []int{2020},
6411		compressor:  []string{"comp-type"},
6412		subtype:     []string{"json"},
6413	}
6414
6415	if !reflect.DeepEqual(expected, observedOpts) {
6416		t.Errorf("streaming call did not observe expected options: expected %#v, got %#v", expected, observedOpts)
6417	}
6418}
6419
6420func TestCompressorRegister(t *testing.T) {
6421	defer leakcheck.Check(t)
6422	for _, e := range listTestEnv() {
6423		testCompressorRegister(t, e)
6424	}
6425}
6426
6427func testCompressorRegister(t *testing.T, e env) {
6428	te := newTest(t, e)
6429	te.clientCompression = false
6430	te.serverCompression = false
6431	te.clientUseCompression = true
6432
6433	te.startServer(&testServer{security: e.security})
6434	defer te.tearDown()
6435	tc := testpb.NewTestServiceClient(te.clientConn())
6436
6437	// Unary call
6438	const argSize = 271828
6439	const respSize = 314159
6440	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
6441	if err != nil {
6442		t.Fatal(err)
6443	}
6444	req := &testpb.SimpleRequest{
6445		ResponseType: testpb.PayloadType_COMPRESSABLE,
6446		ResponseSize: respSize,
6447		Payload:      payload,
6448	}
6449	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something"))
6450	if _, err := tc.UnaryCall(ctx, req); err != nil {
6451		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
6452	}
6453	// Streaming RPC
6454	ctx, cancel := context.WithCancel(context.Background())
6455	defer cancel()
6456	stream, err := tc.FullDuplexCall(ctx)
6457	if err != nil {
6458		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6459	}
6460	respParam := []*testpb.ResponseParameters{
6461		{
6462			Size: 31415,
6463		},
6464	}
6465	payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
6466	if err != nil {
6467		t.Fatal(err)
6468	}
6469	sreq := &testpb.StreamingOutputCallRequest{
6470		ResponseType:       testpb.PayloadType_COMPRESSABLE,
6471		ResponseParameters: respParam,
6472		Payload:            payload,
6473	}
6474	if err := stream.Send(sreq); err != nil {
6475		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6476	}
6477	if _, err := stream.Recv(); err != nil {
6478		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
6479	}
6480}
6481
6482func TestServeExitsWhenListenerClosed(t *testing.T) {
6483	defer leakcheck.Check(t)
6484
6485	ss := &stubServer{
6486		emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
6487			return &testpb.Empty{}, nil
6488		},
6489	}
6490
6491	s := grpc.NewServer()
6492	testpb.RegisterTestServiceServer(s, ss)
6493
6494	lis, err := net.Listen("tcp", "localhost:0")
6495	if err != nil {
6496		t.Fatalf("Failed to create listener: %v", err)
6497	}
6498
6499	done := make(chan struct{})
6500	go func() {
6501		s.Serve(lis)
6502		close(done)
6503	}()
6504
6505	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
6506	if err != nil {
6507		t.Fatalf("Failed to dial server: %v", err)
6508	}
6509	defer cc.Close()
6510	c := testpb.NewTestServiceClient(cc)
6511	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6512	defer cancel()
6513	if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
6514		t.Fatalf("Failed to send test RPC to server: %v", err)
6515	}
6516
6517	if err := lis.Close(); err != nil {
6518		t.Fatalf("Failed to close listener: %v", err)
6519	}
6520	const timeout = 5 * time.Second
6521	timer := time.NewTimer(timeout)
6522	select {
6523	case <-done:
6524		return
6525	case <-timer.C:
6526		t.Fatalf("Serve did not return after %v", timeout)
6527	}
6528}
6529
6530// Service handler returns status with invalid utf8 message.
6531func TestStatusInvalidUTF8Message(t *testing.T) {
6532	defer leakcheck.Check(t)
6533
6534	var (
6535		origMsg = string([]byte{0xff, 0xfe, 0xfd})
6536		wantMsg = "���"
6537	)
6538
6539	ss := &stubServer{
6540		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
6541			return nil, status.Errorf(codes.Internal, origMsg)
6542		},
6543	}
6544	if err := ss.Start(nil); err != nil {
6545		t.Fatalf("Error starting endpoint server: %v", err)
6546	}
6547	defer ss.Stop()
6548
6549	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
6550	defer cancel()
6551
6552	if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); status.Convert(err).Message() != wantMsg {
6553		t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, status.Convert(err).Message(), wantMsg)
6554	}
6555}
6556
6557// Service handler returns status with details and invalid utf8 message. Proto
6558// will fail to marshal the status because of the invalid utf8 message. Details
6559// will be dropped when sending.
6560func TestStatusInvalidUTF8Details(t *testing.T) {
6561	defer leakcheck.Check(t)
6562
6563	var (
6564		origMsg = string([]byte{0xff, 0xfe, 0xfd})
6565		wantMsg = "���"
6566	)
6567
6568	ss := &stubServer{
6569		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
6570			st := status.New(codes.Internal, origMsg)
6571			st, err := st.WithDetails(&testpb.Empty{})
6572			if err != nil {
6573				return nil, err
6574			}
6575			return nil, st.Err()
6576		},
6577	}
6578	if err := ss.Start(nil); err != nil {
6579		t.Fatalf("Error starting endpoint server: %v", err)
6580	}
6581	defer ss.Stop()
6582
6583	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
6584	defer cancel()
6585
6586	_, err := ss.client.EmptyCall(ctx, &testpb.Empty{})
6587	st := status.Convert(err)
6588	if st.Message() != wantMsg {
6589		t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, st.Message(), wantMsg)
6590	}
6591	if len(st.Details()) != 0 {
6592		// Details should be dropped on the server side.
6593		t.Fatalf("RPC status contain details: %v, want no details", st.Details())
6594	}
6595}
6596
6597func TestClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T) {
6598	defer leakcheck.Check(t)
6599	for _, e := range listTestEnv() {
6600		if e.httpHandler {
6601			continue
6602		}
6603		testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t, e)
6604	}
6605}
6606
6607func testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T, e env) {
6608	te := newTest(t, e)
6609	te.userAgent = testAppUA
6610	smallSize := 1024
6611	te.maxServerReceiveMsgSize = &smallSize
6612	te.startServer(&testServer{security: e.security})
6613	defer te.tearDown()
6614	tc := testpb.NewTestServiceClient(te.clientConn())
6615	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1048576)
6616	if err != nil {
6617		t.Fatal(err)
6618	}
6619	req := &testpb.SimpleRequest{
6620		ResponseType: testpb.PayloadType_COMPRESSABLE,
6621		Payload:      payload,
6622	}
6623	var wg sync.WaitGroup
6624	for i := 0; i < 10; i++ {
6625		wg.Add(1)
6626		go func() {
6627			defer wg.Done()
6628			for j := 0; j < 100; j++ {
6629				ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
6630				defer cancel()
6631				if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.ResourceExhausted {
6632					t.Errorf("TestService/UnaryCall(_,_) = _. %v, want code: %s", err, codes.ResourceExhausted)
6633					return
6634				}
6635			}
6636		}()
6637	}
6638	wg.Wait()
6639}
6640
6641const clientAlwaysFailCredErrorMsg = "clientAlwaysFailCred always fails"
6642
6643var errClientAlwaysFailCred = errors.New(clientAlwaysFailCredErrorMsg)
6644
6645type clientAlwaysFailCred struct{}
6646
6647func (c clientAlwaysFailCred) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
6648	return nil, nil, errClientAlwaysFailCred
6649}
6650func (c clientAlwaysFailCred) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
6651	return rawConn, nil, nil
6652}
6653func (c clientAlwaysFailCred) Info() credentials.ProtocolInfo {
6654	return credentials.ProtocolInfo{}
6655}
6656func (c clientAlwaysFailCred) Clone() credentials.TransportCredentials {
6657	return nil
6658}
6659func (c clientAlwaysFailCred) OverrideServerName(s string) error {
6660	return nil
6661}
6662
6663func TestFailFastRPCErrorOnBadCertificates(t *testing.T) {
6664	te := newTest(t, env{name: "bad-cred", network: "tcp", security: "clientAlwaysFailCred", balancer: "round_robin"})
6665	te.startServer(&testServer{security: te.e.security})
6666	defer te.tearDown()
6667
6668	opts := []grpc.DialOption{grpc.WithTransportCredentials(clientAlwaysFailCred{})}
6669	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
6670	defer cancel()
6671	cc, err := grpc.DialContext(ctx, te.srvAddr, opts...)
6672	if err != nil {
6673		t.Fatalf("Dial(_) = %v, want %v", err, nil)
6674	}
6675	defer cc.Close()
6676
6677	tc := testpb.NewTestServiceClient(cc)
6678	for i := 0; i < 1000; i++ {
6679		// This loop runs for at most 1 second. The first several RPCs will fail
6680		// with Unavailable because the connection hasn't started. When the
6681		// first connection failed with creds error, the next RPC should also
6682		// fail with the expected error.
6683		if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) {
6684			return
6685		}
6686		time.Sleep(time.Millisecond)
6687	}
6688	te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg)
6689}
6690
6691func TestRPCTimeout(t *testing.T) {
6692	defer leakcheck.Check(t)
6693	for _, e := range listTestEnv() {
6694		testRPCTimeout(t, e)
6695	}
6696}
6697
6698func testRPCTimeout(t *testing.T, e env) {
6699	te := newTest(t, e)
6700	te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond})
6701	defer te.tearDown()
6702
6703	cc := te.clientConn()
6704	tc := testpb.NewTestServiceClient(cc)
6705
6706	const argSize = 2718
6707	const respSize = 314
6708
6709	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
6710	if err != nil {
6711		t.Fatal(err)
6712	}
6713
6714	req := &testpb.SimpleRequest{
6715		ResponseType: testpb.PayloadType_COMPRESSABLE,
6716		ResponseSize: respSize,
6717		Payload:      payload,
6718	}
6719	for i := -1; i <= 10; i++ {
6720		ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
6721		if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded {
6722			t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
6723		}
6724		cancel()
6725	}
6726}
6727
6728func TestDisabledIOBuffers(t *testing.T) {
6729	defer leakcheck.Check(t)
6730
6731	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(60000))
6732	if err != nil {
6733		t.Fatalf("Failed to create payload: %v", err)
6734	}
6735	req := &testpb.StreamingOutputCallRequest{
6736		Payload: payload,
6737	}
6738	resp := &testpb.StreamingOutputCallResponse{
6739		Payload: payload,
6740	}
6741
6742	ss := &stubServer{
6743		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
6744			for {
6745				in, err := stream.Recv()
6746				if err == io.EOF {
6747					return nil
6748				}
6749				if err != nil {
6750					t.Errorf("stream.Recv() = _, %v, want _, <nil>", err)
6751					return err
6752				}
6753				if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
6754					t.Errorf("Received message(len: %v) on server not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
6755					return err
6756				}
6757				if err := stream.Send(resp); err != nil {
6758					t.Errorf("stream.Send(_)= %v, want <nil>", err)
6759					return err
6760				}
6761
6762			}
6763		},
6764	}
6765
6766	s := grpc.NewServer(grpc.WriteBufferSize(0), grpc.ReadBufferSize(0))
6767	testpb.RegisterTestServiceServer(s, ss)
6768
6769	lis, err := net.Listen("tcp", "localhost:0")
6770	if err != nil {
6771		t.Fatalf("Failed to create listener: %v", err)
6772	}
6773
6774	done := make(chan struct{})
6775	go func() {
6776		s.Serve(lis)
6777		close(done)
6778	}()
6779	defer s.Stop()
6780	dctx, dcancel := context.WithTimeout(context.Background(), 5*time.Second)
6781	defer dcancel()
6782	cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithWriteBufferSize(0), grpc.WithReadBufferSize(0))
6783	if err != nil {
6784		t.Fatalf("Failed to dial server")
6785	}
6786	defer cc.Close()
6787	c := testpb.NewTestServiceClient(cc)
6788	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6789	defer cancel()
6790	stream, err := c.FullDuplexCall(ctx, grpc.FailFast(false))
6791	if err != nil {
6792		t.Fatalf("Failed to send test RPC to server")
6793	}
6794	for i := 0; i < 10; i++ {
6795		if err := stream.Send(req); err != nil {
6796			t.Fatalf("stream.Send(_) = %v, want <nil>", err)
6797		}
6798		in, err := stream.Recv()
6799		if err != nil {
6800			t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
6801		}
6802		if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
6803			t.Fatalf("Received message(len: %v) on client not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
6804		}
6805	}
6806	stream.CloseSend()
6807	if _, err := stream.Recv(); err != io.EOF {
6808		t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
6809	}
6810}
6811
6812func TestServerMaxHeaderListSizeClientUserViolation(t *testing.T) {
6813	defer leakcheck.Check(t)
6814	for _, e := range listTestEnv() {
6815		if e.httpHandler {
6816			continue
6817		}
6818		testServerMaxHeaderListSizeClientUserViolation(t, e)
6819	}
6820}
6821
6822func testServerMaxHeaderListSizeClientUserViolation(t *testing.T, e env) {
6823	te := newTest(t, e)
6824	te.maxServerHeaderListSize = new(uint32)
6825	*te.maxServerHeaderListSize = 216
6826	te.startServer(&testServer{security: e.security})
6827	defer te.tearDown()
6828
6829	cc := te.clientConn()
6830	tc := testpb.NewTestServiceClient(cc)
6831	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6832	defer cancel()
6833	metadata.AppendToOutgoingContext(ctx, "oversize", string(make([]byte, 216)))
6834	var err error
6835	if err = verifyResultWithDelay(func() (bool, error) {
6836		if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal {
6837			return true, nil
6838		}
6839		return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal)
6840	}); err != nil {
6841		t.Fatal(err)
6842	}
6843}
6844
6845func TestClientMaxHeaderListSizeServerUserViolation(t *testing.T) {
6846	defer leakcheck.Check(t)
6847	for _, e := range listTestEnv() {
6848		if e.httpHandler {
6849			continue
6850		}
6851		testClientMaxHeaderListSizeServerUserViolation(t, e)
6852	}
6853}
6854
6855func testClientMaxHeaderListSizeServerUserViolation(t *testing.T, e env) {
6856	te := newTest(t, e)
6857	te.maxClientHeaderListSize = new(uint32)
6858	*te.maxClientHeaderListSize = 1 // any header server sends will violate
6859	te.startServer(&testServer{security: e.security})
6860	defer te.tearDown()
6861
6862	cc := te.clientConn()
6863	tc := testpb.NewTestServiceClient(cc)
6864	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6865	defer cancel()
6866	var err error
6867	if err = verifyResultWithDelay(func() (bool, error) {
6868		if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal {
6869			return true, nil
6870		}
6871		return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal)
6872	}); err != nil {
6873		t.Fatal(err)
6874	}
6875}
6876
6877func TestServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T) {
6878	defer leakcheck.Check(t)
6879	for _, e := range listTestEnv() {
6880		if e.httpHandler || e.security == "tls" {
6881			continue
6882		}
6883		testServerMaxHeaderListSizeClientIntentionalViolation(t, e)
6884	}
6885}
6886
6887func testServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T, e env) {
6888	te := newTest(t, e)
6889	te.maxServerHeaderListSize = new(uint32)
6890	*te.maxServerHeaderListSize = 512
6891	te.startServer(&testServer{security: e.security})
6892	defer te.tearDown()
6893
6894	cc, dw := te.clientConnWithConnControl()
6895	tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
6896	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6897	defer cancel()
6898	stream, err := tc.FullDuplexCall(ctx)
6899	if err != nil {
6900		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
6901	}
6902	rcw := dw.getRawConnWrapper()
6903	val := make([]string, 512)
6904	for i := range val {
6905		val[i] = "a"
6906	}
6907	// allow for client to send the initial header
6908	time.Sleep(100 * time.Millisecond)
6909	rcw.writeHeaders(http2.HeadersFrameParam{
6910		StreamID:      tc.getCurrentStreamID(),
6911		BlockFragment: rcw.encodeHeader("oversize", strings.Join(val, "")),
6912		EndStream:     false,
6913		EndHeaders:    true,
6914	})
6915	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal {
6916		t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal)
6917	}
6918}
6919
6920func TestClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T) {
6921	defer leakcheck.Check(t)
6922	for _, e := range listTestEnv() {
6923		if e.httpHandler || e.security == "tls" {
6924			continue
6925		}
6926		testClientMaxHeaderListSizeServerIntentionalViolation(t, e)
6927	}
6928}
6929
6930func testClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T, e env) {
6931	te := newTest(t, e)
6932	te.maxClientHeaderListSize = new(uint32)
6933	*te.maxClientHeaderListSize = 200
6934	lw := te.startServerWithConnControl(&testServer{security: e.security, setHeaderOnly: true})
6935	defer te.tearDown()
6936	cc, _ := te.clientConnWithConnControl()
6937	tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
6938	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6939	defer cancel()
6940	stream, err := tc.FullDuplexCall(ctx)
6941	if err != nil {
6942		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
6943	}
6944	var i int
6945	var rcw *rawConnWrapper
6946	for i = 0; i < 100; i++ {
6947		rcw = lw.getLastConn()
6948		if rcw != nil {
6949			break
6950		}
6951		time.Sleep(10 * time.Millisecond)
6952		continue
6953	}
6954	if i == 100 {
6955		t.Fatalf("failed to create server transport after 1s")
6956	}
6957
6958	val := make([]string, 200)
6959	for i := range val {
6960		val[i] = "a"
6961	}
6962	// allow for client to send the initial header.
6963	time.Sleep(100 * time.Millisecond)
6964	rcw.writeHeaders(http2.HeadersFrameParam{
6965		StreamID:      tc.getCurrentStreamID(),
6966		BlockFragment: rcw.encodeHeader("oversize", strings.Join(val, "")),
6967		EndStream:     false,
6968		EndHeaders:    true,
6969	})
6970	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal {
6971		t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal)
6972	}
6973}
6974
6975func TestNetPipeConn(t *testing.T) {
6976	// This test will block indefinitely if grpc writes both client and server
6977	// prefaces without either reading from the Conn.
6978	defer leakcheck.Check(t)
6979	pl := testutils.NewPipeListener()
6980	s := grpc.NewServer()
6981	defer s.Stop()
6982	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
6983		return &testpb.SimpleResponse{}, nil
6984	}}
6985	testpb.RegisterTestServiceServer(s, ts)
6986	go s.Serve(pl)
6987	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6988	defer cancel()
6989	cc, err := grpc.DialContext(ctx, "", grpc.WithInsecure(), grpc.WithDialer(pl.Dialer()))
6990	if err != nil {
6991		t.Fatalf("Error creating client: %v", err)
6992	}
6993	defer cc.Close()
6994	client := testpb.NewTestServiceClient(cc)
6995	if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
6996		t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
6997	}
6998}
6999
7000func TestLargeTimeout(t *testing.T) {
7001	defer leakcheck.Check(t)
7002	for _, e := range listTestEnv() {
7003		testLargeTimeout(t, e)
7004	}
7005}
7006
7007func testLargeTimeout(t *testing.T, e env) {
7008	te := newTest(t, e)
7009	te.declareLogNoise("Server.processUnaryRPC failed to write status")
7010
7011	ts := &funcServer{}
7012	te.startServer(ts)
7013	defer te.tearDown()
7014	tc := testpb.NewTestServiceClient(te.clientConn())
7015
7016	timeouts := []time.Duration{
7017		time.Duration(math.MaxInt64), // will be (correctly) converted to
7018		// 2562048 hours, which overflows upon converting back to an int64
7019		2562047 * time.Hour, // the largest timeout that does not overflow
7020	}
7021
7022	for i, maxTimeout := range timeouts {
7023		ts.unaryCall = func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
7024			deadline, ok := ctx.Deadline()
7025			timeout := deadline.Sub(time.Now())
7026			minTimeout := maxTimeout - 5*time.Second
7027			if !ok || timeout < minTimeout || timeout > maxTimeout {
7028				t.Errorf("ctx.Deadline() = (now+%v), %v; want [%v, %v], true", timeout, ok, minTimeout, maxTimeout)
7029				return nil, status.Error(codes.OutOfRange, "deadline error")
7030			}
7031			return &testpb.SimpleResponse{}, nil
7032		}
7033
7034		ctx, cancel := context.WithTimeout(context.Background(), maxTimeout)
7035		defer cancel()
7036
7037		if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
7038			t.Errorf("case %v: UnaryCall(_) = _, %v; want _, nil", i, err)
7039		}
7040	}
7041}
7042
7043// Proxies typically send GO_AWAY followed by connection closure a minute or so later. This
7044// test ensures that the connection is re-created after GO_AWAY and not affected by the
7045// subsequent (old) connection closure.
7046func TestGoAwayThenClose(t *testing.T) {
7047	defer leakcheck.Check(t)
7048
7049	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
7050	defer cancel()
7051
7052	lis1, err := net.Listen("tcp", "localhost:0")
7053	if err != nil {
7054		t.Fatalf("Error while listening. Err: %v", err)
7055	}
7056	s1 := grpc.NewServer()
7057	defer s1.Stop()
7058	ts1 := &funcServer{
7059		unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
7060			return &testpb.SimpleResponse{}, nil
7061		},
7062		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
7063			// Wait forever.
7064			_, err := stream.Recv()
7065			if err == nil {
7066				t.Error("expected to never receive any message")
7067			}
7068			return err
7069		},
7070	}
7071	testpb.RegisterTestServiceServer(s1, ts1)
7072	go s1.Serve(lis1)
7073
7074	conn2Established := grpcsync.NewEvent()
7075	lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established)
7076	if err != nil {
7077		t.Fatalf("Error while listening. Err: %v", err)
7078	}
7079	s2 := grpc.NewServer()
7080	defer s2.Stop()
7081	conn2Ready := grpcsync.NewEvent()
7082	ts2 := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
7083		conn2Ready.Fire()
7084		return &testpb.SimpleResponse{}, nil
7085	}}
7086	testpb.RegisterTestServiceServer(s2, ts2)
7087	go s2.Serve(lis2)
7088
7089	r, rcleanup := manual.GenerateAndRegisterManualResolver()
7090	defer rcleanup()
7091	r.InitialAddrs([]resolver.Address{
7092		{Addr: lis1.Addr().String()},
7093		{Addr: lis2.Addr().String()},
7094	})
7095	cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure())
7096	if err != nil {
7097		t.Fatalf("Error creating client: %v", err)
7098	}
7099	defer cc.Close()
7100
7101	client := testpb.NewTestServiceClient(cc)
7102
7103	// Should go on connection 1. We use a long-lived RPC because it will cause GracefulStop to send GO_AWAY, but the
7104	// connection doesn't get closed until the server stops and the client receives.
7105	stream, err := client.FullDuplexCall(ctx)
7106	if err != nil {
7107		t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err)
7108	}
7109
7110	// Send GO_AWAY to connection 1.
7111	go s1.GracefulStop()
7112
7113	// Wait for connection 2 to be established.
7114	<-conn2Established.Done()
7115
7116	// Close connection 1.
7117	s1.Stop()
7118
7119	// Wait for client to close.
7120	_, err = stream.Recv()
7121	if err == nil {
7122		t.Fatal("expected the stream to die, but got a successful Recv")
7123	}
7124
7125	// Do a bunch of RPCs, make sure it stays stable. These should go to connection 2.
7126	for i := 0; i < 10; i++ {
7127		if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
7128			t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
7129		}
7130	}
7131}
7132
7133func listenWithNotifyingListener(network, address string, event *grpcsync.Event) (net.Listener, error) {
7134	lis, err := net.Listen(network, address)
7135	if err != nil {
7136		return nil, err
7137	}
7138	return notifyingListener{connEstablished: event, Listener: lis}, nil
7139}
7140
7141type notifyingListener struct {
7142	connEstablished *grpcsync.Event
7143	net.Listener
7144}
7145
7146func (lis notifyingListener) Accept() (net.Conn, error) {
7147	defer lis.connEstablished.Fire()
7148	return lis.Listener.Accept()
7149}
7150
7151func TestRPCWaitsForResolver(t *testing.T) {
7152	te := testServiceConfigSetup(t, tcpClearRREnv)
7153	te.startServer(&testServer{security: tcpClearRREnv.security})
7154	defer te.tearDown()
7155	r, rcleanup := manual.GenerateAndRegisterManualResolver()
7156	defer rcleanup()
7157
7158	te.resolverScheme = r.Scheme()
7159	te.nonBlockingDial = true
7160	cc := te.clientConn()
7161	tc := testpb.NewTestServiceClient(cc)
7162
7163	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
7164	defer cancel()
7165	// With no resolved addresses yet, this will timeout.
7166	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
7167		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
7168	}
7169
7170	ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
7171	defer cancel()
7172	go func() {
7173		time.Sleep(time.Second)
7174		r.NewServiceConfig(`{
7175		    "methodConfig": [
7176		        {
7177		            "name": [
7178		                {
7179		                    "service": "grpc.testing.TestService",
7180		                    "method": "UnaryCall"
7181		                }
7182		            ],
7183                    "maxRequestMessageBytes": 0
7184		        }
7185		    ]
7186		}`)
7187		r.NewAddress([]resolver.Address{{Addr: te.srvAddr}})
7188	}()
7189	// We wait a second before providing a service config and resolving
7190	// addresses.  So this will wait for that and then honor the
7191	// maxRequestMessageBytes it contains.
7192	if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{ResponseType: testpb.PayloadType_UNCOMPRESSABLE}); status.Code(err) != codes.ResourceExhausted {
7193		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err)
7194	}
7195	if got := ctx.Err(); got != nil {
7196		t.Fatalf("ctx.Err() = %v; want nil (deadline should be set short by service config)", got)
7197	}
7198	if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
7199		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err)
7200	}
7201}
7202