1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package test
20
21import (
22	"bufio"
23	"bytes"
24	"compress/gzip"
25	"context"
26	"crypto/tls"
27	"errors"
28	"flag"
29	"fmt"
30	"io"
31	"math"
32	"net"
33	"net/http"
34	"os"
35	"reflect"
36	"runtime"
37	"strings"
38	"sync"
39	"sync/atomic"
40	"syscall"
41	"testing"
42	"time"
43
44	"github.com/golang/protobuf/proto"
45	anypb "github.com/golang/protobuf/ptypes/any"
46	"golang.org/x/net/http2"
47	"golang.org/x/net/http2/hpack"
48	spb "google.golang.org/genproto/googleapis/rpc/status"
49	"google.golang.org/grpc"
50	"google.golang.org/grpc/codes"
51	"google.golang.org/grpc/connectivity"
52	"google.golang.org/grpc/credentials"
53	"google.golang.org/grpc/encoding"
54	_ "google.golang.org/grpc/encoding/gzip"
55	"google.golang.org/grpc/health"
56	healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
57	healthpb "google.golang.org/grpc/health/grpc_health_v1"
58	"google.golang.org/grpc/internal/channelz"
59	"google.golang.org/grpc/internal/grpcsync"
60	"google.golang.org/grpc/internal/grpctest"
61	"google.golang.org/grpc/internal/testutils"
62	"google.golang.org/grpc/internal/transport"
63	"google.golang.org/grpc/metadata"
64	"google.golang.org/grpc/peer"
65	"google.golang.org/grpc/resolver"
66	"google.golang.org/grpc/resolver/manual"
67	"google.golang.org/grpc/serviceconfig"
68	"google.golang.org/grpc/stats"
69	"google.golang.org/grpc/status"
70	"google.golang.org/grpc/tap"
71	testpb "google.golang.org/grpc/test/grpc_testing"
72	"google.golang.org/grpc/testdata"
73)
74
75const defaultHealthService = "grpc.health.v1.Health"
76
77func init() {
78	channelz.TurnOn()
79}
80
81type s struct {
82	grpctest.Tester
83}
84
85func Test(t *testing.T) {
86	grpctest.RunSubTests(t, s{})
87}
88
89var (
90	// For headers:
91	testMetadata = metadata.MD{
92		"key1":     []string{"value1"},
93		"key2":     []string{"value2"},
94		"key3-bin": []string{"binvalue1", string([]byte{1, 2, 3})},
95	}
96	testMetadata2 = metadata.MD{
97		"key1": []string{"value12"},
98		"key2": []string{"value22"},
99	}
100	// For trailers:
101	testTrailerMetadata = metadata.MD{
102		"tkey1":     []string{"trailerValue1"},
103		"tkey2":     []string{"trailerValue2"},
104		"tkey3-bin": []string{"trailerbinvalue1", string([]byte{3, 2, 1})},
105	}
106	testTrailerMetadata2 = metadata.MD{
107		"tkey1": []string{"trailerValue12"},
108		"tkey2": []string{"trailerValue22"},
109	}
110	// capital "Key" is illegal in HTTP/2.
111	malformedHTTP2Metadata = metadata.MD{
112		"Key": []string{"foo"},
113	}
114	testAppUA     = "myApp1/1.0 myApp2/0.9"
115	failAppUA     = "fail-this-RPC"
116	detailedError = status.ErrorProto(&spb.Status{
117		Code:    int32(codes.DataLoss),
118		Message: "error for testing: " + failAppUA,
119		Details: []*anypb.Any{{
120			TypeUrl: "url",
121			Value:   []byte{6, 0, 0, 6, 1, 3},
122		}},
123	})
124)
125
126var raceMode bool // set by race.go in race mode
127
128type testServer struct {
129	testpb.UnimplementedTestServiceServer
130
131	security           string // indicate the authentication protocol used by this server.
132	earlyFail          bool   // whether to error out the execution of a service handler prematurely.
133	setAndSendHeader   bool   // whether to call setHeader and sendHeader.
134	setHeaderOnly      bool   // whether to only call setHeader, not sendHeader.
135	multipleSetTrailer bool   // whether to call setTrailer multiple times.
136	unaryCallSleepTime time.Duration
137}
138
139func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
140	if md, ok := metadata.FromIncomingContext(ctx); ok {
141		// For testing purpose, returns an error if user-agent is failAppUA.
142		// To test that client gets the correct error.
143		if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
144			return nil, detailedError
145		}
146		var str []string
147		for _, entry := range md["user-agent"] {
148			str = append(str, "ua", entry)
149		}
150		grpc.SendHeader(ctx, metadata.Pairs(str...))
151	}
152	return new(testpb.Empty), nil
153}
154
155func newPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
156	if size < 0 {
157		return nil, fmt.Errorf("requested a response with invalid length %d", size)
158	}
159	body := make([]byte, size)
160	switch t {
161	case testpb.PayloadType_COMPRESSABLE:
162	case testpb.PayloadType_UNCOMPRESSABLE:
163		return nil, fmt.Errorf("PayloadType UNCOMPRESSABLE is not supported")
164	default:
165		return nil, fmt.Errorf("unsupported payload type: %d", t)
166	}
167	return &testpb.Payload{
168		Type: t,
169		Body: body,
170	}, nil
171}
172
173func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
174	md, ok := metadata.FromIncomingContext(ctx)
175	if ok {
176		if _, exists := md[":authority"]; !exists {
177			return nil, status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
178		}
179		if s.setAndSendHeader {
180			if err := grpc.SetHeader(ctx, md); err != nil {
181				return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
182			}
183			if err := grpc.SendHeader(ctx, testMetadata2); err != nil {
184				return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testMetadata2, err)
185			}
186		} else if s.setHeaderOnly {
187			if err := grpc.SetHeader(ctx, md); err != nil {
188				return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
189			}
190			if err := grpc.SetHeader(ctx, testMetadata2); err != nil {
191				return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", testMetadata2, err)
192			}
193		} else {
194			if err := grpc.SendHeader(ctx, md); err != nil {
195				return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err)
196			}
197		}
198		if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
199			return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
200		}
201		if s.multipleSetTrailer {
202			if err := grpc.SetTrailer(ctx, testTrailerMetadata2); err != nil {
203				return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata2, err)
204			}
205		}
206	}
207	pr, ok := peer.FromContext(ctx)
208	if !ok {
209		return nil, status.Error(codes.DataLoss, "failed to get peer from ctx")
210	}
211	if pr.Addr == net.Addr(nil) {
212		return nil, status.Error(codes.DataLoss, "failed to get peer address")
213	}
214	if s.security != "" {
215		// Check Auth info
216		var authType, serverName string
217		switch info := pr.AuthInfo.(type) {
218		case credentials.TLSInfo:
219			authType = info.AuthType()
220			serverName = info.State.ServerName
221		default:
222			return nil, status.Error(codes.Unauthenticated, "Unknown AuthInfo type")
223		}
224		if authType != s.security {
225			return nil, status.Errorf(codes.Unauthenticated, "Wrong auth type: got %q, want %q", authType, s.security)
226		}
227		if serverName != "x.test.youtube.com" {
228			return nil, status.Errorf(codes.Unauthenticated, "Unknown server name %q", serverName)
229		}
230	}
231	// Simulate some service delay.
232	time.Sleep(s.unaryCallSleepTime)
233
234	payload, err := newPayload(in.GetResponseType(), in.GetResponseSize())
235	if err != nil {
236		return nil, err
237	}
238
239	return &testpb.SimpleResponse{
240		Payload: payload,
241	}, nil
242}
243
244func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
245	if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
246		if _, exists := md[":authority"]; !exists {
247			return status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
248		}
249		// For testing purpose, returns an error if user-agent is failAppUA.
250		// To test that client gets the correct error.
251		if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
252			return status.Error(codes.DataLoss, "error for testing: "+failAppUA)
253		}
254	}
255	cs := args.GetResponseParameters()
256	for _, c := range cs {
257		if us := c.GetIntervalUs(); us > 0 {
258			time.Sleep(time.Duration(us) * time.Microsecond)
259		}
260
261		payload, err := newPayload(args.GetResponseType(), c.GetSize())
262		if err != nil {
263			return err
264		}
265
266		if err := stream.Send(&testpb.StreamingOutputCallResponse{
267			Payload: payload,
268		}); err != nil {
269			return err
270		}
271	}
272	return nil
273}
274
275func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
276	var sum int
277	for {
278		in, err := stream.Recv()
279		if err == io.EOF {
280			return stream.SendAndClose(&testpb.StreamingInputCallResponse{
281				AggregatedPayloadSize: int32(sum),
282			})
283		}
284		if err != nil {
285			return err
286		}
287		p := in.GetPayload().GetBody()
288		sum += len(p)
289		if s.earlyFail {
290			return status.Error(codes.NotFound, "not found")
291		}
292	}
293}
294
295func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
296	md, ok := metadata.FromIncomingContext(stream.Context())
297	if ok {
298		if s.setAndSendHeader {
299			if err := stream.SetHeader(md); err != nil {
300				return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
301			}
302			if err := stream.SendHeader(testMetadata2); err != nil {
303				return status.Errorf(status.Code(err), "%v.SendHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
304			}
305		} else if s.setHeaderOnly {
306			if err := stream.SetHeader(md); err != nil {
307				return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
308			}
309			if err := stream.SetHeader(testMetadata2); err != nil {
310				return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
311			}
312		} else {
313			if err := stream.SendHeader(md); err != nil {
314				return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
315			}
316		}
317		stream.SetTrailer(testTrailerMetadata)
318		if s.multipleSetTrailer {
319			stream.SetTrailer(testTrailerMetadata2)
320		}
321	}
322	for {
323		in, err := stream.Recv()
324		if err == io.EOF {
325			// read done.
326			return nil
327		}
328		if err != nil {
329			// to facilitate testSvrWriteStatusEarlyWrite
330			if status.Code(err) == codes.ResourceExhausted {
331				return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
332			}
333			return err
334		}
335		cs := in.GetResponseParameters()
336		for _, c := range cs {
337			if us := c.GetIntervalUs(); us > 0 {
338				time.Sleep(time.Duration(us) * time.Microsecond)
339			}
340
341			payload, err := newPayload(in.GetResponseType(), c.GetSize())
342			if err != nil {
343				return err
344			}
345
346			if err := stream.Send(&testpb.StreamingOutputCallResponse{
347				Payload: payload,
348			}); err != nil {
349				// to facilitate testSvrWriteStatusEarlyWrite
350				if status.Code(err) == codes.ResourceExhausted {
351					return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
352				}
353				return err
354			}
355		}
356	}
357}
358
359func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error {
360	var msgBuf []*testpb.StreamingOutputCallRequest
361	for {
362		in, err := stream.Recv()
363		if err == io.EOF {
364			// read done.
365			break
366		}
367		if err != nil {
368			return err
369		}
370		msgBuf = append(msgBuf, in)
371	}
372	for _, m := range msgBuf {
373		cs := m.GetResponseParameters()
374		for _, c := range cs {
375			if us := c.GetIntervalUs(); us > 0 {
376				time.Sleep(time.Duration(us) * time.Microsecond)
377			}
378
379			payload, err := newPayload(m.GetResponseType(), c.GetSize())
380			if err != nil {
381				return err
382			}
383
384			if err := stream.Send(&testpb.StreamingOutputCallResponse{
385				Payload: payload,
386			}); err != nil {
387				return err
388			}
389		}
390	}
391	return nil
392}
393
394type env struct {
395	name         string
396	network      string // The type of network such as tcp, unix, etc.
397	security     string // The security protocol such as TLS, SSH, etc.
398	httpHandler  bool   // whether to use the http.Handler ServerTransport; requires TLS
399	balancer     string // One of "round_robin", "pick_first", or "".
400	customDialer func(string, string, time.Duration) (net.Conn, error)
401}
402
403func (e env) runnable() bool {
404	if runtime.GOOS == "windows" && e.network == "unix" {
405		return false
406	}
407	return true
408}
409
410func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) {
411	if e.customDialer != nil {
412		return e.customDialer(e.network, addr, timeout)
413	}
414	return net.DialTimeout(e.network, addr, timeout)
415}
416
417var (
418	tcpClearEnv   = env{name: "tcp-clear-v1-balancer", network: "tcp"}
419	tcpTLSEnv     = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls"}
420	tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "round_robin"}
421	tcpTLSRREnv   = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "round_robin"}
422	handlerEnv    = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "round_robin"}
423	noBalancerEnv = env{name: "no-balancer", network: "tcp", security: "tls"}
424	allEnv        = []env{tcpClearEnv, tcpTLSEnv, tcpClearRREnv, tcpTLSRREnv, handlerEnv, noBalancerEnv}
425)
426
427var onlyEnv = flag.String("only_env", "", "If non-empty, one of 'tcp-clear', 'tcp-tls', 'unix-clear', 'unix-tls', or 'handler-tls' to only run the tests for that environment. Empty means all.")
428
429func listTestEnv() (envs []env) {
430	if *onlyEnv != "" {
431		for _, e := range allEnv {
432			if e.name == *onlyEnv {
433				if !e.runnable() {
434					panic(fmt.Sprintf("--only_env environment %q does not run on %s", *onlyEnv, runtime.GOOS))
435				}
436				return []env{e}
437			}
438		}
439		panic(fmt.Sprintf("invalid --only_env value %q", *onlyEnv))
440	}
441	for _, e := range allEnv {
442		if e.runnable() {
443			envs = append(envs, e)
444		}
445	}
446	return envs
447}
448
449// test is an end-to-end test. It should be created with the newTest
450// func, modified as needed, and then started with its startServer method.
451// It should be cleaned up with the tearDown method.
452type test struct {
453	// The following are setup in newTest().
454	t      *testing.T
455	e      env
456	ctx    context.Context // valid for life of test, before tearDown
457	cancel context.CancelFunc
458
459	// The following knobs are for the server-side, and should be set after
460	// calling newTest() and before calling startServer().
461
462	// whether or not to expose the server's health via the default health
463	// service implementation.
464	enableHealthServer bool
465	// In almost all cases, one should set the 'enableHealthServer' flag above to
466	// expose the server's health using the default health service
467	// implementation. This should only be used when a non-default health service
468	// implementation is required.
469	healthServer            healthpb.HealthServer
470	maxStream               uint32
471	tapHandle               tap.ServerInHandle
472	maxServerMsgSize        *int
473	maxServerReceiveMsgSize *int
474	maxServerSendMsgSize    *int
475	maxServerHeaderListSize *uint32
476	// Used to test the deprecated API WithCompressor and WithDecompressor.
477	serverCompression           bool
478	unknownHandler              grpc.StreamHandler
479	unaryServerInt              grpc.UnaryServerInterceptor
480	streamServerInt             grpc.StreamServerInterceptor
481	serverInitialWindowSize     int32
482	serverInitialConnWindowSize int32
483	customServerOptions         []grpc.ServerOption
484
485	// The following knobs are for the client-side, and should be set after
486	// calling newTest() and before calling clientConn().
487	maxClientMsgSize        *int
488	maxClientReceiveMsgSize *int
489	maxClientSendMsgSize    *int
490	maxClientHeaderListSize *uint32
491	userAgent               string
492	// Used to test the deprecated API WithCompressor and WithDecompressor.
493	clientCompression bool
494	// Used to test the new compressor registration API UseCompressor.
495	clientUseCompression bool
496	// clientNopCompression is set to create a compressor whose type is not supported.
497	clientNopCompression        bool
498	unaryClientInt              grpc.UnaryClientInterceptor
499	streamClientInt             grpc.StreamClientInterceptor
500	sc                          <-chan grpc.ServiceConfig
501	customCodec                 encoding.Codec
502	clientInitialWindowSize     int32
503	clientInitialConnWindowSize int32
504	perRPCCreds                 credentials.PerRPCCredentials
505	customDialOptions           []grpc.DialOption
506	resolverScheme              string
507
508	// All test dialing is blocking by default. Set this to true if dial
509	// should be non-blocking.
510	nonBlockingDial bool
511
512	// These are are set once startServer is called. The common case is to have
513	// only one testServer.
514	srv     stopper
515	hSrv    healthpb.HealthServer
516	srvAddr string
517
518	// These are are set once startServers is called.
519	srvs     []stopper
520	hSrvs    []healthpb.HealthServer
521	srvAddrs []string
522
523	cc          *grpc.ClientConn // nil until requested via clientConn
524	restoreLogs func()           // nil unless declareLogNoise is used
525}
526
527type stopper interface {
528	Stop()
529	GracefulStop()
530}
531
532func (te *test) tearDown() {
533	if te.cancel != nil {
534		te.cancel()
535		te.cancel = nil
536	}
537
538	if te.cc != nil {
539		te.cc.Close()
540		te.cc = nil
541	}
542
543	if te.restoreLogs != nil {
544		te.restoreLogs()
545		te.restoreLogs = nil
546	}
547
548	if te.srv != nil {
549		te.srv.Stop()
550	}
551	for _, s := range te.srvs {
552		s.Stop()
553	}
554}
555
556// newTest returns a new test using the provided testing.T and
557// environment.  It is returned with default values. Tests should
558// modify it before calling its startServer and clientConn methods.
559func newTest(t *testing.T, e env) *test {
560	te := &test{
561		t:         t,
562		e:         e,
563		maxStream: math.MaxUint32,
564	}
565	te.ctx, te.cancel = context.WithCancel(context.Background())
566	return te
567}
568
569func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener {
570	te.t.Helper()
571	te.t.Logf("Running test in %s environment...", te.e.name)
572	sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
573	if te.maxServerMsgSize != nil {
574		sopts = append(sopts, grpc.MaxMsgSize(*te.maxServerMsgSize))
575	}
576	if te.maxServerReceiveMsgSize != nil {
577		sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize))
578	}
579	if te.maxServerSendMsgSize != nil {
580		sopts = append(sopts, grpc.MaxSendMsgSize(*te.maxServerSendMsgSize))
581	}
582	if te.maxServerHeaderListSize != nil {
583		sopts = append(sopts, grpc.MaxHeaderListSize(*te.maxServerHeaderListSize))
584	}
585	if te.tapHandle != nil {
586		sopts = append(sopts, grpc.InTapHandle(te.tapHandle))
587	}
588	if te.serverCompression {
589		sopts = append(sopts,
590			grpc.RPCCompressor(grpc.NewGZIPCompressor()),
591			grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
592		)
593	}
594	if te.unaryServerInt != nil {
595		sopts = append(sopts, grpc.UnaryInterceptor(te.unaryServerInt))
596	}
597	if te.streamServerInt != nil {
598		sopts = append(sopts, grpc.StreamInterceptor(te.streamServerInt))
599	}
600	if te.unknownHandler != nil {
601		sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
602	}
603	if te.serverInitialWindowSize > 0 {
604		sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
605	}
606	if te.serverInitialConnWindowSize > 0 {
607		sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
608	}
609	la := "localhost:0"
610	switch te.e.network {
611	case "unix":
612		la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano())
613		syscall.Unlink(la)
614	}
615	lis, err := listen(te.e.network, la)
616	if err != nil {
617		te.t.Fatalf("Failed to listen: %v", err)
618	}
619	if te.e.security == "tls" {
620		creds, err := credentials.NewServerTLSFromFile(testdata.Path("server1.pem"), testdata.Path("server1.key"))
621		if err != nil {
622			te.t.Fatalf("Failed to generate credentials %v", err)
623		}
624		sopts = append(sopts, grpc.Creds(creds))
625	}
626	sopts = append(sopts, te.customServerOptions...)
627	s := grpc.NewServer(sopts...)
628	if ts != nil {
629		testpb.RegisterTestServiceServer(s, ts)
630	}
631
632	// Create a new default health server if enableHealthServer is set, or use
633	// the provided one.
634	hs := te.healthServer
635	if te.enableHealthServer {
636		hs = health.NewServer()
637	}
638	if hs != nil {
639		healthgrpc.RegisterHealthServer(s, hs)
640	}
641
642	addr := la
643	switch te.e.network {
644	case "unix":
645	default:
646		_, port, err := net.SplitHostPort(lis.Addr().String())
647		if err != nil {
648			te.t.Fatalf("Failed to parse listener address: %v", err)
649		}
650		addr = "localhost:" + port
651	}
652
653	te.srv = s
654	te.hSrv = hs
655	te.srvAddr = addr
656
657	if te.e.httpHandler {
658		if te.e.security != "tls" {
659			te.t.Fatalf("unsupported environment settings")
660		}
661		cert, err := tls.LoadX509KeyPair(testdata.Path("server1.pem"), testdata.Path("server1.key"))
662		if err != nil {
663			te.t.Fatal("tls.LoadX509KeyPair(server1.pem, server1.key) failed: ", err)
664		}
665		hs := &http.Server{
666			Handler:   s,
667			TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}},
668		}
669		if err := http2.ConfigureServer(hs, &http2.Server{MaxConcurrentStreams: te.maxStream}); err != nil {
670			te.t.Fatal("http2.ConfigureServer(_, _) failed: ", err)
671		}
672		te.srv = wrapHS{hs}
673		tlsListener := tls.NewListener(lis, hs.TLSConfig)
674		go hs.Serve(tlsListener)
675		return lis
676	}
677
678	go s.Serve(lis)
679	return lis
680}
681
682type wrapHS struct {
683	s *http.Server
684}
685
686func (w wrapHS) GracefulStop() {
687	w.s.Shutdown(context.Background())
688}
689
690func (w wrapHS) Stop() {
691	w.s.Close()
692}
693
694func (te *test) startServerWithConnControl(ts testpb.TestServiceServer) *listenerWrapper {
695	l := te.listenAndServe(ts, listenWithConnControl)
696	return l.(*listenerWrapper)
697}
698
699// startServer starts a gRPC server exposing the provided TestService
700// implementation. Callers should defer a call to te.tearDown to clean up
701func (te *test) startServer(ts testpb.TestServiceServer) {
702	te.t.Helper()
703	te.listenAndServe(ts, net.Listen)
704}
705
706// startServers starts 'num' gRPC servers exposing the provided TestService.
707func (te *test) startServers(ts testpb.TestServiceServer, num int) {
708	for i := 0; i < num; i++ {
709		te.startServer(ts)
710		te.srvs = append(te.srvs, te.srv.(*grpc.Server))
711		te.hSrvs = append(te.hSrvs, te.hSrv)
712		te.srvAddrs = append(te.srvAddrs, te.srvAddr)
713		te.srv = nil
714		te.hSrv = nil
715		te.srvAddr = ""
716	}
717}
718
719// setHealthServingStatus is a helper function to set the health status.
720func (te *test) setHealthServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) {
721	hs, ok := te.hSrv.(*health.Server)
722	if !ok {
723		panic(fmt.Sprintf("SetServingStatus(%v, %v) called for health server of type %T", service, status, hs))
724	}
725	hs.SetServingStatus(service, status)
726}
727
728type nopCompressor struct {
729	grpc.Compressor
730}
731
732// NewNopCompressor creates a compressor to test the case that type is not supported.
733func NewNopCompressor() grpc.Compressor {
734	return &nopCompressor{grpc.NewGZIPCompressor()}
735}
736
737func (c *nopCompressor) Type() string {
738	return "nop"
739}
740
741type nopDecompressor struct {
742	grpc.Decompressor
743}
744
745// NewNopDecompressor creates a decompressor to test the case that type is not supported.
746func NewNopDecompressor() grpc.Decompressor {
747	return &nopDecompressor{grpc.NewGZIPDecompressor()}
748}
749
750func (d *nopDecompressor) Type() string {
751	return "nop"
752}
753
754func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) {
755	opts = append(opts, grpc.WithDialer(te.e.dialer), grpc.WithUserAgent(te.userAgent))
756
757	if te.sc != nil {
758		opts = append(opts, grpc.WithServiceConfig(te.sc))
759	}
760
761	if te.clientCompression {
762		opts = append(opts,
763			grpc.WithCompressor(grpc.NewGZIPCompressor()),
764			grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
765		)
766	}
767	if te.clientUseCompression {
768		opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
769	}
770	if te.clientNopCompression {
771		opts = append(opts,
772			grpc.WithCompressor(NewNopCompressor()),
773			grpc.WithDecompressor(NewNopDecompressor()),
774		)
775	}
776	if te.unaryClientInt != nil {
777		opts = append(opts, grpc.WithUnaryInterceptor(te.unaryClientInt))
778	}
779	if te.streamClientInt != nil {
780		opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt))
781	}
782	if te.maxClientMsgSize != nil {
783		opts = append(opts, grpc.WithMaxMsgSize(*te.maxClientMsgSize))
784	}
785	if te.maxClientReceiveMsgSize != nil {
786		opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize)))
787	}
788	if te.maxClientSendMsgSize != nil {
789		opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(*te.maxClientSendMsgSize)))
790	}
791	if te.maxClientHeaderListSize != nil {
792		opts = append(opts, grpc.WithMaxHeaderListSize(*te.maxClientHeaderListSize))
793	}
794	switch te.e.security {
795	case "tls":
796		creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com")
797		if err != nil {
798			te.t.Fatalf("Failed to load credentials: %v", err)
799		}
800		opts = append(opts, grpc.WithTransportCredentials(creds))
801	case "empty":
802		// Don't add any transport creds option.
803	default:
804		opts = append(opts, grpc.WithInsecure())
805	}
806	// TODO(bar) switch balancer case "pick_first".
807	var scheme string
808	if te.resolverScheme == "" {
809		scheme = "passthrough:///"
810	} else {
811		scheme = te.resolverScheme + ":///"
812	}
813	if te.e.balancer != "" {
814		opts = append(opts, grpc.WithBalancerName(te.e.balancer))
815	}
816	if te.clientInitialWindowSize > 0 {
817		opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
818	}
819	if te.clientInitialConnWindowSize > 0 {
820		opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
821	}
822	if te.perRPCCreds != nil {
823		opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds))
824	}
825	if te.customCodec != nil {
826		opts = append(opts, grpc.WithDefaultCallOptions(grpc.ForceCodec(te.customCodec)))
827	}
828	if !te.nonBlockingDial && te.srvAddr != "" {
829		// Only do a blocking dial if server is up.
830		opts = append(opts, grpc.WithBlock())
831	}
832	if te.srvAddr == "" {
833		te.srvAddr = "client.side.only.test"
834	}
835	opts = append(opts, te.customDialOptions...)
836	return opts, scheme
837}
838
839func (te *test) clientConnWithConnControl() (*grpc.ClientConn, *dialerWrapper) {
840	if te.cc != nil {
841		return te.cc, nil
842	}
843	opts, scheme := te.configDial()
844	dw := &dialerWrapper{}
845	// overwrite the dialer before
846	opts = append(opts, grpc.WithDialer(dw.dialer))
847	var err error
848	te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...)
849	if err != nil {
850		te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err)
851	}
852	return te.cc, dw
853}
854
855func (te *test) clientConn(opts ...grpc.DialOption) *grpc.ClientConn {
856	if te.cc != nil {
857		return te.cc
858	}
859	var scheme string
860	opts, scheme = te.configDial(opts...)
861	var err error
862	te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...)
863	if err != nil {
864		te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err)
865	}
866	return te.cc
867}
868
869func (te *test) declareLogNoise(phrases ...string) {
870	te.restoreLogs = declareLogNoise(te.t, phrases...)
871}
872
873func (te *test) withServerTester(fn func(st *serverTester)) {
874	c, err := te.e.dialer(te.srvAddr, 10*time.Second)
875	if err != nil {
876		te.t.Fatal(err)
877	}
878	defer c.Close()
879	if te.e.security == "tls" {
880		c = tls.Client(c, &tls.Config{
881			InsecureSkipVerify: true,
882			NextProtos:         []string{http2.NextProtoTLS},
883		})
884	}
885	st := newServerTesterFromConn(te.t, c)
886	st.greet()
887	fn(st)
888}
889
890type lazyConn struct {
891	net.Conn
892	beLazy int32
893}
894
895func (l *lazyConn) Write(b []byte) (int, error) {
896	if atomic.LoadInt32(&(l.beLazy)) == 1 {
897		time.Sleep(time.Second)
898	}
899	return l.Conn.Write(b)
900}
901
902func (s) TestContextDeadlineNotIgnored(t *testing.T) {
903	e := noBalancerEnv
904	var lc *lazyConn
905	e.customDialer = func(network, addr string, timeout time.Duration) (net.Conn, error) {
906		conn, err := net.DialTimeout(network, addr, timeout)
907		if err != nil {
908			return nil, err
909		}
910		lc = &lazyConn{Conn: conn}
911		return lc, nil
912	}
913
914	te := newTest(t, e)
915	te.startServer(&testServer{security: e.security})
916	defer te.tearDown()
917
918	cc := te.clientConn()
919	tc := testpb.NewTestServiceClient(cc)
920	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
921		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
922	}
923	atomic.StoreInt32(&(lc.beLazy), 1)
924	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
925	defer cancel()
926	t1 := time.Now()
927	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
928		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err)
929	}
930	if time.Since(t1) > 2*time.Second {
931		t.Fatalf("TestService/EmptyCall(_, _) ran over the deadline")
932	}
933}
934
935func (s) TestTimeoutOnDeadServer(t *testing.T) {
936	for _, e := range listTestEnv() {
937		testTimeoutOnDeadServer(t, e)
938	}
939}
940
941func testTimeoutOnDeadServer(t *testing.T, e env) {
942	te := newTest(t, e)
943	te.userAgent = testAppUA
944	te.declareLogNoise(
945		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
946		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
947		"grpc: addrConn.resetTransport failed to create client transport: connection error",
948	)
949	te.startServer(&testServer{security: e.security})
950	defer te.tearDown()
951
952	cc := te.clientConn()
953	tc := testpb.NewTestServiceClient(cc)
954	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
955		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
956	}
957	te.srv.Stop()
958
959	// Wait for the client to notice the connection is gone.
960	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
961	state := cc.GetState()
962	for ; state == connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
963	}
964	cancel()
965	if state == connectivity.Ready {
966		t.Fatalf("Timed out waiting for non-ready state")
967	}
968	ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond)
969	_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true))
970	cancel()
971	if e.balancer != "" && status.Code(err) != codes.DeadlineExceeded {
972		// If e.balancer == nil, the ac will stop reconnecting because the dialer returns non-temp error,
973		// the error will be an internal error.
974		t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded)
975	}
976	awaitNewConnLogOutput()
977}
978
979func (s) TestServerGracefulStopIdempotent(t *testing.T) {
980	for _, e := range listTestEnv() {
981		if e.name == "handler-tls" {
982			continue
983		}
984		testServerGracefulStopIdempotent(t, e)
985	}
986}
987
988func testServerGracefulStopIdempotent(t *testing.T, e env) {
989	te := newTest(t, e)
990	te.userAgent = testAppUA
991	te.startServer(&testServer{security: e.security})
992	defer te.tearDown()
993
994	for i := 0; i < 3; i++ {
995		te.srv.GracefulStop()
996	}
997}
998
999func (s) TestServerGoAway(t *testing.T) {
1000	for _, e := range listTestEnv() {
1001		if e.name == "handler-tls" {
1002			continue
1003		}
1004		testServerGoAway(t, e)
1005	}
1006}
1007
1008func testServerGoAway(t *testing.T, e env) {
1009	te := newTest(t, e)
1010	te.userAgent = testAppUA
1011	te.startServer(&testServer{security: e.security})
1012	defer te.tearDown()
1013
1014	cc := te.clientConn()
1015	tc := testpb.NewTestServiceClient(cc)
1016	// Finish an RPC to make sure the connection is good.
1017	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1018	defer cancel()
1019	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1020		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1021	}
1022	ch := make(chan struct{})
1023	go func() {
1024		te.srv.GracefulStop()
1025		close(ch)
1026	}()
1027	// Loop until the server side GoAway signal is propagated to the client.
1028	for {
1029		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1030		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) != codes.DeadlineExceeded {
1031			cancel()
1032			break
1033		}
1034		cancel()
1035	}
1036	// A new RPC should fail.
1037	ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
1038	defer cancel()
1039	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal {
1040		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
1041	}
1042	<-ch
1043	awaitNewConnLogOutput()
1044}
1045
1046func (s) TestServerGoAwayPendingRPC(t *testing.T) {
1047	for _, e := range listTestEnv() {
1048		if e.name == "handler-tls" {
1049			continue
1050		}
1051		testServerGoAwayPendingRPC(t, e)
1052	}
1053}
1054
1055func testServerGoAwayPendingRPC(t *testing.T, e env) {
1056	te := newTest(t, e)
1057	te.userAgent = testAppUA
1058	te.declareLogNoise(
1059		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1060		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1061		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1062	)
1063	te.startServer(&testServer{security: e.security})
1064	defer te.tearDown()
1065
1066	cc := te.clientConn()
1067	tc := testpb.NewTestServiceClient(cc)
1068	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1069	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
1070	if err != nil {
1071		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1072	}
1073	// Finish an RPC to make sure the connection is good.
1074	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1075		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1076	}
1077	ch := make(chan struct{})
1078	go func() {
1079		te.srv.GracefulStop()
1080		close(ch)
1081	}()
1082	// Loop until the server side GoAway signal is propagated to the client.
1083	start := time.Now()
1084	errored := false
1085	for time.Since(start) < time.Second {
1086		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1087		_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true))
1088		cancel()
1089		if err != nil {
1090			errored = true
1091			break
1092		}
1093	}
1094	if !errored {
1095		t.Fatalf("GoAway never received by client")
1096	}
1097	respParam := []*testpb.ResponseParameters{{Size: 1}}
1098	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
1099	if err != nil {
1100		t.Fatal(err)
1101	}
1102	req := &testpb.StreamingOutputCallRequest{
1103		ResponseType:       testpb.PayloadType_COMPRESSABLE,
1104		ResponseParameters: respParam,
1105		Payload:            payload,
1106	}
1107	// The existing RPC should be still good to proceed.
1108	if err := stream.Send(req); err != nil {
1109		t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
1110	}
1111	if _, err := stream.Recv(); err != nil {
1112		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
1113	}
1114	// The RPC will run until canceled.
1115	cancel()
1116	<-ch
1117	awaitNewConnLogOutput()
1118}
1119
1120func (s) TestServerMultipleGoAwayPendingRPC(t *testing.T) {
1121	for _, e := range listTestEnv() {
1122		if e.name == "handler-tls" {
1123			continue
1124		}
1125		testServerMultipleGoAwayPendingRPC(t, e)
1126	}
1127}
1128
1129func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
1130	te := newTest(t, e)
1131	te.userAgent = testAppUA
1132	te.declareLogNoise(
1133		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1134		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1135		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1136	)
1137	te.startServer(&testServer{security: e.security})
1138	defer te.tearDown()
1139
1140	cc := te.clientConn()
1141	tc := testpb.NewTestServiceClient(cc)
1142	ctx, cancel := context.WithCancel(context.Background())
1143	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
1144	if err != nil {
1145		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1146	}
1147	// Finish an RPC to make sure the connection is good.
1148	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1149		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1150	}
1151	ch1 := make(chan struct{})
1152	go func() {
1153		te.srv.GracefulStop()
1154		close(ch1)
1155	}()
1156	ch2 := make(chan struct{})
1157	go func() {
1158		te.srv.GracefulStop()
1159		close(ch2)
1160	}()
1161	// Loop until the server side GoAway signal is propagated to the client.
1162	for {
1163		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1164		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1165			cancel()
1166			break
1167		}
1168		cancel()
1169	}
1170	select {
1171	case <-ch1:
1172		t.Fatal("GracefulStop() terminated early")
1173	case <-ch2:
1174		t.Fatal("GracefulStop() terminated early")
1175	default:
1176	}
1177	respParam := []*testpb.ResponseParameters{
1178		{
1179			Size: 1,
1180		},
1181	}
1182	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
1183	if err != nil {
1184		t.Fatal(err)
1185	}
1186	req := &testpb.StreamingOutputCallRequest{
1187		ResponseType:       testpb.PayloadType_COMPRESSABLE,
1188		ResponseParameters: respParam,
1189		Payload:            payload,
1190	}
1191	// The existing RPC should be still good to proceed.
1192	if err := stream.Send(req); err != nil {
1193		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
1194	}
1195	if _, err := stream.Recv(); err != nil {
1196		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
1197	}
1198	if err := stream.CloseSend(); err != nil {
1199		t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
1200	}
1201	<-ch1
1202	<-ch2
1203	cancel()
1204	awaitNewConnLogOutput()
1205}
1206
1207func (s) TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) {
1208	for _, e := range listTestEnv() {
1209		if e.name == "handler-tls" {
1210			continue
1211		}
1212		testConcurrentClientConnCloseAndServerGoAway(t, e)
1213	}
1214}
1215
1216func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
1217	te := newTest(t, e)
1218	te.userAgent = testAppUA
1219	te.declareLogNoise(
1220		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1221		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1222		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1223	)
1224	te.startServer(&testServer{security: e.security})
1225	defer te.tearDown()
1226
1227	cc := te.clientConn()
1228	tc := testpb.NewTestServiceClient(cc)
1229	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1230		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1231	}
1232	ch := make(chan struct{})
1233	// Close ClientConn and Server concurrently.
1234	go func() {
1235		te.srv.GracefulStop()
1236		close(ch)
1237	}()
1238	go func() {
1239		cc.Close()
1240	}()
1241	<-ch
1242}
1243
1244func (s) TestConcurrentServerStopAndGoAway(t *testing.T) {
1245	for _, e := range listTestEnv() {
1246		if e.name == "handler-tls" {
1247			continue
1248		}
1249		testConcurrentServerStopAndGoAway(t, e)
1250	}
1251}
1252
1253func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
1254	te := newTest(t, e)
1255	te.userAgent = testAppUA
1256	te.declareLogNoise(
1257		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1258		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1259		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1260	)
1261	te.startServer(&testServer{security: e.security})
1262	defer te.tearDown()
1263
1264	cc := te.clientConn()
1265	tc := testpb.NewTestServiceClient(cc)
1266	stream, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
1267	if err != nil {
1268		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1269	}
1270	// Finish an RPC to make sure the connection is good.
1271	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1272		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1273	}
1274	ch := make(chan struct{})
1275	go func() {
1276		te.srv.GracefulStop()
1277		close(ch)
1278	}()
1279	// Loop until the server side GoAway signal is propagated to the client.
1280	for {
1281		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1282		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1283			cancel()
1284			break
1285		}
1286		cancel()
1287	}
1288	// Stop the server and close all the connections.
1289	te.srv.Stop()
1290	respParam := []*testpb.ResponseParameters{
1291		{
1292			Size: 1,
1293		},
1294	}
1295	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
1296	if err != nil {
1297		t.Fatal(err)
1298	}
1299	req := &testpb.StreamingOutputCallRequest{
1300		ResponseType:       testpb.PayloadType_COMPRESSABLE,
1301		ResponseParameters: respParam,
1302		Payload:            payload,
1303	}
1304	sendStart := time.Now()
1305	for {
1306		if err := stream.Send(req); err == io.EOF {
1307			// stream.Send should eventually send io.EOF
1308			break
1309		} else if err != nil {
1310			// Send should never return a transport-level error.
1311			t.Fatalf("stream.Send(%v) = %v; want <nil or io.EOF>", req, err)
1312		}
1313		if time.Since(sendStart) > 2*time.Second {
1314			t.Fatalf("stream.Send(_) did not return io.EOF after 2s")
1315		}
1316		time.Sleep(time.Millisecond)
1317	}
1318	if _, err := stream.Recv(); err == nil || err == io.EOF {
1319		t.Fatalf("%v.Recv() = _, %v, want _, <non-nil, non-EOF>", stream, err)
1320	}
1321	<-ch
1322	awaitNewConnLogOutput()
1323}
1324
1325func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
1326	for _, e := range listTestEnv() {
1327		if e.name == "handler-tls" {
1328			continue
1329		}
1330		testClientConnCloseAfterGoAwayWithActiveStream(t, e)
1331	}
1332}
1333
1334func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
1335	te := newTest(t, e)
1336	te.startServer(&testServer{security: e.security})
1337	defer te.tearDown()
1338	cc := te.clientConn()
1339	tc := testpb.NewTestServiceClient(cc)
1340
1341	ctx, cancel := context.WithCancel(context.Background())
1342	defer cancel()
1343	if _, err := tc.FullDuplexCall(ctx); err != nil {
1344		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
1345	}
1346	done := make(chan struct{})
1347	go func() {
1348		te.srv.GracefulStop()
1349		close(done)
1350	}()
1351	time.Sleep(50 * time.Millisecond)
1352	cc.Close()
1353	timeout := time.NewTimer(time.Second)
1354	select {
1355	case <-done:
1356	case <-timeout.C:
1357		t.Fatalf("Test timed-out.")
1358	}
1359}
1360
1361func (s) TestFailFast(t *testing.T) {
1362	for _, e := range listTestEnv() {
1363		testFailFast(t, e)
1364	}
1365}
1366
1367func testFailFast(t *testing.T, e env) {
1368	te := newTest(t, e)
1369	te.userAgent = testAppUA
1370	te.declareLogNoise(
1371		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1372		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1373		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1374	)
1375	te.startServer(&testServer{security: e.security})
1376	defer te.tearDown()
1377
1378	cc := te.clientConn()
1379	tc := testpb.NewTestServiceClient(cc)
1380	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1381	defer cancel()
1382	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1383		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1384	}
1385	// Stop the server and tear down all the existing connections.
1386	te.srv.Stop()
1387	// Loop until the server teardown is propagated to the client.
1388	for {
1389		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1390		_, err := tc.EmptyCall(ctx, &testpb.Empty{})
1391		cancel()
1392		if status.Code(err) == codes.Unavailable {
1393			break
1394		}
1395		t.Logf("%v.EmptyCall(_, _) = _, %v", tc, err)
1396		time.Sleep(10 * time.Millisecond)
1397	}
1398	// The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable.
1399	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable {
1400		t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable)
1401	}
1402	if _, err := tc.StreamingInputCall(context.Background()); status.Code(err) != codes.Unavailable {
1403		t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable)
1404	}
1405
1406	awaitNewConnLogOutput()
1407}
1408
1409func testServiceConfigSetup(t *testing.T, e env) *test {
1410	te := newTest(t, e)
1411	te.userAgent = testAppUA
1412	te.declareLogNoise(
1413		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1414		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1415		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1416		"Failed to dial : context canceled; please retry.",
1417	)
1418	return te
1419}
1420
1421func newBool(b bool) (a *bool) {
1422	return &b
1423}
1424
1425func newInt(b int) (a *int) {
1426	return &b
1427}
1428
1429func newDuration(b time.Duration) (a *time.Duration) {
1430	a = new(time.Duration)
1431	*a = b
1432	return
1433}
1434
1435func (s) TestGetMethodConfig(t *testing.T) {
1436	te := testServiceConfigSetup(t, tcpClearRREnv)
1437	defer te.tearDown()
1438	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1439	defer rcleanup()
1440
1441	te.resolverScheme = r.Scheme()
1442	cc := te.clientConn()
1443	addrs := []resolver.Address{{Addr: te.srvAddr}}
1444	r.UpdateState(resolver.State{
1445		Addresses: addrs,
1446		ServiceConfig: parseCfg(r, `{
1447    "methodConfig": [
1448        {
1449            "name": [
1450                {
1451                    "service": "grpc.testing.TestService",
1452                    "method": "EmptyCall"
1453                }
1454            ],
1455            "waitForReady": true,
1456            "timeout": ".001s"
1457        },
1458        {
1459            "name": [
1460                {
1461                    "service": "grpc.testing.TestService"
1462                }
1463            ],
1464            "waitForReady": false
1465        }
1466    ]
1467}`)})
1468
1469	tc := testpb.NewTestServiceClient(cc)
1470
1471	// Make sure service config has been processed by grpc.
1472	for {
1473		if cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil {
1474			break
1475		}
1476		time.Sleep(time.Millisecond)
1477	}
1478
1479	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1480	var err error
1481	if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
1482		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1483	}
1484
1485	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseCfg(r, `{
1486    "methodConfig": [
1487        {
1488            "name": [
1489                {
1490                    "service": "grpc.testing.TestService",
1491                    "method": "UnaryCall"
1492                }
1493            ],
1494            "waitForReady": true,
1495            "timeout": ".001s"
1496        },
1497        {
1498            "name": [
1499                {
1500                    "service": "grpc.testing.TestService"
1501                }
1502            ],
1503            "waitForReady": false
1504        }
1505    ]
1506}`)})
1507
1508	// Make sure service config has been processed by grpc.
1509	for {
1510		if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && !*mc.WaitForReady {
1511			break
1512		}
1513		time.Sleep(time.Millisecond)
1514	}
1515	// The following RPCs are expected to become fail-fast.
1516	if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable {
1517		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
1518	}
1519}
1520
1521func (s) TestServiceConfigWaitForReady(t *testing.T) {
1522	te := testServiceConfigSetup(t, tcpClearRREnv)
1523	defer te.tearDown()
1524	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1525	defer rcleanup()
1526
1527	// Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
1528	te.resolverScheme = r.Scheme()
1529	cc := te.clientConn()
1530	addrs := []resolver.Address{{Addr: te.srvAddr}}
1531	r.UpdateState(resolver.State{
1532		Addresses: addrs,
1533		ServiceConfig: parseCfg(r, `{
1534    "methodConfig": [
1535        {
1536            "name": [
1537                {
1538                    "service": "grpc.testing.TestService",
1539                    "method": "EmptyCall"
1540                },
1541                {
1542                    "service": "grpc.testing.TestService",
1543                    "method": "FullDuplexCall"
1544                }
1545            ],
1546            "waitForReady": false,
1547            "timeout": ".001s"
1548        }
1549    ]
1550}`)})
1551
1552	tc := testpb.NewTestServiceClient(cc)
1553
1554	// Make sure service config has been processed by grpc.
1555	for {
1556		if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
1557			break
1558		}
1559		time.Sleep(time.Millisecond)
1560	}
1561
1562	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1563	var err error
1564	if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1565		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1566	}
1567	if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1568		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1569	}
1570
1571	// Generate a service config update.
1572	// Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
1573	r.UpdateState(resolver.State{
1574		Addresses: addrs,
1575		ServiceConfig: parseCfg(r, `{
1576    "methodConfig": [
1577        {
1578            "name": [
1579                {
1580                    "service": "grpc.testing.TestService",
1581                    "method": "EmptyCall"
1582                },
1583                {
1584                    "service": "grpc.testing.TestService",
1585                    "method": "FullDuplexCall"
1586                }
1587            ],
1588            "waitForReady": true,
1589            "timeout": ".001s"
1590        }
1591    ]
1592}`)})
1593
1594	// Wait for the new service config to take effect.
1595	for {
1596		if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && *mc.WaitForReady {
1597			break
1598		}
1599		time.Sleep(time.Millisecond)
1600	}
1601	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1602	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
1603		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1604	}
1605	if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded {
1606		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1607	}
1608}
1609
1610func (s) TestServiceConfigTimeout(t *testing.T) {
1611	te := testServiceConfigSetup(t, tcpClearRREnv)
1612	defer te.tearDown()
1613	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1614	defer rcleanup()
1615
1616	// Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
1617	te.resolverScheme = r.Scheme()
1618	cc := te.clientConn()
1619	addrs := []resolver.Address{{Addr: te.srvAddr}}
1620	r.UpdateState(resolver.State{
1621		Addresses: addrs,
1622		ServiceConfig: parseCfg(r, `{
1623    "methodConfig": [
1624        {
1625            "name": [
1626                {
1627                    "service": "grpc.testing.TestService",
1628                    "method": "EmptyCall"
1629                },
1630                {
1631                    "service": "grpc.testing.TestService",
1632                    "method": "FullDuplexCall"
1633                }
1634            ],
1635            "waitForReady": true,
1636            "timeout": "3600s"
1637        }
1638    ]
1639}`)})
1640
1641	tc := testpb.NewTestServiceClient(cc)
1642
1643	// Make sure service config has been processed by grpc.
1644	for {
1645		if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
1646			break
1647		}
1648		time.Sleep(time.Millisecond)
1649	}
1650
1651	// The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
1652	var err error
1653	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
1654	if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1655		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1656	}
1657	cancel()
1658
1659	ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond)
1660	if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1661		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1662	}
1663	cancel()
1664
1665	// Generate a service config update.
1666	// Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
1667	r.UpdateState(resolver.State{
1668		Addresses: addrs,
1669		ServiceConfig: parseCfg(r, `{
1670    "methodConfig": [
1671        {
1672            "name": [
1673                {
1674                    "service": "grpc.testing.TestService",
1675                    "method": "EmptyCall"
1676                },
1677                {
1678                    "service": "grpc.testing.TestService",
1679                    "method": "FullDuplexCall"
1680                }
1681            ],
1682            "waitForReady": true,
1683            "timeout": ".000000001s"
1684        }
1685    ]
1686}`)})
1687
1688	// Wait for the new service config to take effect.
1689	for {
1690		if mc := cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall"); mc.Timeout != nil && *mc.Timeout == time.Nanosecond {
1691			break
1692		}
1693		time.Sleep(time.Millisecond)
1694	}
1695
1696	ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
1697	if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1698		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1699	}
1700	cancel()
1701
1702	ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
1703	if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1704		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1705	}
1706	cancel()
1707}
1708
1709func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
1710	e := tcpClearRREnv
1711	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1712	defer rcleanup()
1713
1714	// Setting up values and objects shared across all test cases.
1715	const smallSize = 1
1716	const largeSize = 1024
1717	const extraLargeSize = 2048
1718
1719	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1720	if err != nil {
1721		t.Fatal(err)
1722	}
1723	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
1724	if err != nil {
1725		t.Fatal(err)
1726	}
1727	extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
1728	if err != nil {
1729		t.Fatal(err)
1730	}
1731
1732	// Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1733	te1 := testServiceConfigSetup(t, e)
1734	defer te1.tearDown()
1735
1736	te1.resolverScheme = r.Scheme()
1737	te1.nonBlockingDial = true
1738	te1.startServer(&testServer{security: e.security})
1739	cc1 := te1.clientConn()
1740
1741	addrs := []resolver.Address{{Addr: te1.srvAddr}}
1742	sc := parseCfg(r, `{
1743    "methodConfig": [
1744        {
1745            "name": [
1746                {
1747                    "service": "grpc.testing.TestService",
1748                    "method": "UnaryCall"
1749                },
1750                {
1751                    "service": "grpc.testing.TestService",
1752                    "method": "FullDuplexCall"
1753                }
1754            ],
1755            "maxRequestMessageBytes": 2048,
1756            "maxResponseMessageBytes": 2048
1757        }
1758    ]
1759}`)
1760	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
1761	tc := testpb.NewTestServiceClient(cc1)
1762
1763	req := &testpb.SimpleRequest{
1764		ResponseType: testpb.PayloadType_COMPRESSABLE,
1765		ResponseSize: int32(extraLargeSize),
1766		Payload:      smallPayload,
1767	}
1768
1769	for {
1770		if cc1.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1771			break
1772		}
1773		time.Sleep(time.Millisecond)
1774	}
1775
1776	// Test for unary RPC recv.
1777	if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted {
1778		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1779	}
1780
1781	// Test for unary RPC send.
1782	req.Payload = extraLargePayload
1783	req.ResponseSize = int32(smallSize)
1784	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
1785		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1786	}
1787
1788	// Test for streaming RPC recv.
1789	respParam := []*testpb.ResponseParameters{
1790		{
1791			Size: int32(extraLargeSize),
1792		},
1793	}
1794	sreq := &testpb.StreamingOutputCallRequest{
1795		ResponseType:       testpb.PayloadType_COMPRESSABLE,
1796		ResponseParameters: respParam,
1797		Payload:            smallPayload,
1798	}
1799	stream, err := tc.FullDuplexCall(te1.ctx)
1800	if err != nil {
1801		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1802	}
1803	if err = stream.Send(sreq); err != nil {
1804		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1805	}
1806	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1807		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1808	}
1809
1810	// Test for streaming RPC send.
1811	respParam[0].Size = int32(smallSize)
1812	sreq.Payload = extraLargePayload
1813	stream, err = tc.FullDuplexCall(te1.ctx)
1814	if err != nil {
1815		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1816	}
1817	if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
1818		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1819	}
1820
1821	// Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1822	te2 := testServiceConfigSetup(t, e)
1823	te2.resolverScheme = r.Scheme()
1824	te2.nonBlockingDial = true
1825	te2.maxClientReceiveMsgSize = newInt(1024)
1826	te2.maxClientSendMsgSize = newInt(1024)
1827
1828	te2.startServer(&testServer{security: e.security})
1829	defer te2.tearDown()
1830	cc2 := te2.clientConn()
1831	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te2.srvAddr}}, ServiceConfig: sc})
1832	tc = testpb.NewTestServiceClient(cc2)
1833
1834	for {
1835		if cc2.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1836			break
1837		}
1838		time.Sleep(time.Millisecond)
1839	}
1840
1841	// Test for unary RPC recv.
1842	req.Payload = smallPayload
1843	req.ResponseSize = int32(largeSize)
1844
1845	if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted {
1846		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1847	}
1848
1849	// Test for unary RPC send.
1850	req.Payload = largePayload
1851	req.ResponseSize = int32(smallSize)
1852	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
1853		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1854	}
1855
1856	// Test for streaming RPC recv.
1857	stream, err = tc.FullDuplexCall(te2.ctx)
1858	respParam[0].Size = int32(largeSize)
1859	sreq.Payload = smallPayload
1860	if err != nil {
1861		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1862	}
1863	if err = stream.Send(sreq); err != nil {
1864		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1865	}
1866	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1867		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1868	}
1869
1870	// Test for streaming RPC send.
1871	respParam[0].Size = int32(smallSize)
1872	sreq.Payload = largePayload
1873	stream, err = tc.FullDuplexCall(te2.ctx)
1874	if err != nil {
1875		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1876	}
1877	if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
1878		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1879	}
1880
1881	// Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1882	te3 := testServiceConfigSetup(t, e)
1883	te3.resolverScheme = r.Scheme()
1884	te3.nonBlockingDial = true
1885	te3.maxClientReceiveMsgSize = newInt(4096)
1886	te3.maxClientSendMsgSize = newInt(4096)
1887
1888	te3.startServer(&testServer{security: e.security})
1889	defer te3.tearDown()
1890
1891	cc3 := te3.clientConn()
1892	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te3.srvAddr}}, ServiceConfig: sc})
1893	tc = testpb.NewTestServiceClient(cc3)
1894
1895	for {
1896		if cc3.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1897			break
1898		}
1899		time.Sleep(time.Millisecond)
1900	}
1901
1902	// Test for unary RPC recv.
1903	req.Payload = smallPayload
1904	req.ResponseSize = int32(largeSize)
1905
1906	if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err != nil {
1907		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
1908	}
1909
1910	req.ResponseSize = int32(extraLargeSize)
1911	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
1912		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1913	}
1914
1915	// Test for unary RPC send.
1916	req.Payload = largePayload
1917	req.ResponseSize = int32(smallSize)
1918	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
1919		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
1920	}
1921
1922	req.Payload = extraLargePayload
1923	if _, err = tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
1924		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1925	}
1926
1927	// Test for streaming RPC recv.
1928	stream, err = tc.FullDuplexCall(te3.ctx)
1929	if err != nil {
1930		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1931	}
1932	respParam[0].Size = int32(largeSize)
1933	sreq.Payload = smallPayload
1934
1935	if err = stream.Send(sreq); err != nil {
1936		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1937	}
1938	if _, err = stream.Recv(); err != nil {
1939		t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
1940	}
1941
1942	respParam[0].Size = int32(extraLargeSize)
1943
1944	if err = stream.Send(sreq); err != nil {
1945		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1946	}
1947	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1948		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1949	}
1950
1951	// Test for streaming RPC send.
1952	respParam[0].Size = int32(smallSize)
1953	sreq.Payload = largePayload
1954	stream, err = tc.FullDuplexCall(te3.ctx)
1955	if err != nil {
1956		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1957	}
1958	if err := stream.Send(sreq); err != nil {
1959		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1960	}
1961	sreq.Payload = extraLargePayload
1962	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
1963		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1964	}
1965}
1966
1967// Reading from a streaming RPC may fail with context canceled if timeout was
1968// set by service config (https://github.com/grpc/grpc-go/issues/1818). This
1969// test makes sure read from streaming RPC doesn't fail in this case.
1970func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
1971	te := testServiceConfigSetup(t, tcpClearRREnv)
1972	te.startServer(&testServer{security: tcpClearRREnv.security})
1973	defer te.tearDown()
1974	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1975	defer rcleanup()
1976
1977	te.resolverScheme = r.Scheme()
1978	te.nonBlockingDial = true
1979	cc := te.clientConn()
1980	tc := testpb.NewTestServiceClient(cc)
1981
1982	r.UpdateState(resolver.State{
1983		Addresses: []resolver.Address{{Addr: te.srvAddr}},
1984		ServiceConfig: parseCfg(r, `{
1985	    "methodConfig": [
1986	        {
1987	            "name": [
1988	                {
1989	                    "service": "grpc.testing.TestService",
1990	                    "method": "FullDuplexCall"
1991	                }
1992	            ],
1993	            "waitForReady": true,
1994	            "timeout": "10s"
1995	        }
1996	    ]
1997	}`)})
1998	// Make sure service config has been processed by grpc.
1999	for {
2000		if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
2001			break
2002		}
2003		time.Sleep(time.Millisecond)
2004	}
2005
2006	ctx, cancel := context.WithCancel(context.Background())
2007	defer cancel()
2008	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
2009	if err != nil {
2010		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
2011	}
2012
2013	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 0)
2014	if err != nil {
2015		t.Fatalf("failed to newPayload: %v", err)
2016	}
2017	req := &testpb.StreamingOutputCallRequest{
2018		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2019		ResponseParameters: []*testpb.ResponseParameters{{Size: 0}},
2020		Payload:            payload,
2021	}
2022	if err := stream.Send(req); err != nil {
2023		t.Fatalf("stream.Send(%v) = %v, want <nil>", req, err)
2024	}
2025	stream.CloseSend()
2026	time.Sleep(time.Second)
2027	// Sleep 1 second before recv to make sure the final status is received
2028	// before the recv.
2029	if _, err := stream.Recv(); err != nil {
2030		t.Fatalf("stream.Recv = _, %v, want _, <nil>", err)
2031	}
2032	// Keep reading to drain the stream.
2033	for {
2034		if _, err := stream.Recv(); err != nil {
2035			break
2036		}
2037	}
2038}
2039
2040func (s) TestPreloaderClientSend(t *testing.T) {
2041	for _, e := range listTestEnv() {
2042		testPreloaderClientSend(t, e)
2043	}
2044}
2045
2046func testPreloaderClientSend(t *testing.T, e env) {
2047	te := newTest(t, e)
2048	te.userAgent = testAppUA
2049	te.declareLogNoise(
2050		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2051		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2052		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2053		"Failed to dial : context canceled; please retry.",
2054	)
2055	te.startServer(&testServer{security: e.security})
2056
2057	defer te.tearDown()
2058	tc := testpb.NewTestServiceClient(te.clientConn())
2059
2060	// Test for streaming RPC recv.
2061	// Set context for send with proper RPC Information
2062	stream, err := tc.FullDuplexCall(te.ctx, grpc.UseCompressor("gzip"))
2063	if err != nil {
2064		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2065	}
2066	var index int
2067	for index < len(reqSizes) {
2068		respParam := []*testpb.ResponseParameters{
2069			{
2070				Size: int32(respSizes[index]),
2071			},
2072		}
2073
2074		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
2075		if err != nil {
2076			t.Fatal(err)
2077		}
2078
2079		req := &testpb.StreamingOutputCallRequest{
2080			ResponseType:       testpb.PayloadType_COMPRESSABLE,
2081			ResponseParameters: respParam,
2082			Payload:            payload,
2083		}
2084		preparedMsg := &grpc.PreparedMsg{}
2085		err = preparedMsg.Encode(stream, req)
2086		if err != nil {
2087			t.Fatalf("PrepareMsg failed for size %d : %v", reqSizes[index], err)
2088		}
2089		if err := stream.SendMsg(preparedMsg); err != nil {
2090			t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
2091		}
2092		reply, err := stream.Recv()
2093		if err != nil {
2094			t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
2095		}
2096		pt := reply.GetPayload().GetType()
2097		if pt != testpb.PayloadType_COMPRESSABLE {
2098			t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
2099		}
2100		size := len(reply.GetPayload().GetBody())
2101		if size != int(respSizes[index]) {
2102			t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
2103		}
2104		index++
2105	}
2106	if err := stream.CloseSend(); err != nil {
2107		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2108	}
2109	if _, err := stream.Recv(); err != io.EOF {
2110		t.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
2111	}
2112}
2113
2114func (s) TestMaxMsgSizeClientDefault(t *testing.T) {
2115	for _, e := range listTestEnv() {
2116		testMaxMsgSizeClientDefault(t, e)
2117	}
2118}
2119
2120func testMaxMsgSizeClientDefault(t *testing.T, e env) {
2121	te := newTest(t, e)
2122	te.userAgent = testAppUA
2123	te.declareLogNoise(
2124		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2125		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2126		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2127		"Failed to dial : context canceled; please retry.",
2128	)
2129	te.startServer(&testServer{security: e.security})
2130
2131	defer te.tearDown()
2132	tc := testpb.NewTestServiceClient(te.clientConn())
2133
2134	const smallSize = 1
2135	const largeSize = 4 * 1024 * 1024
2136	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2137	if err != nil {
2138		t.Fatal(err)
2139	}
2140	req := &testpb.SimpleRequest{
2141		ResponseType: testpb.PayloadType_COMPRESSABLE,
2142		ResponseSize: int32(largeSize),
2143		Payload:      smallPayload,
2144	}
2145	// Test for unary RPC recv.
2146	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2147		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2148	}
2149
2150	respParam := []*testpb.ResponseParameters{
2151		{
2152			Size: int32(largeSize),
2153		},
2154	}
2155	sreq := &testpb.StreamingOutputCallRequest{
2156		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2157		ResponseParameters: respParam,
2158		Payload:            smallPayload,
2159	}
2160
2161	// Test for streaming RPC recv.
2162	stream, err := tc.FullDuplexCall(te.ctx)
2163	if err != nil {
2164		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2165	}
2166	if err := stream.Send(sreq); err != nil {
2167		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2168	}
2169	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2170		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2171	}
2172}
2173
2174func (s) TestMaxMsgSizeClientAPI(t *testing.T) {
2175	for _, e := range listTestEnv() {
2176		testMaxMsgSizeClientAPI(t, e)
2177	}
2178}
2179
2180func testMaxMsgSizeClientAPI(t *testing.T, e env) {
2181	te := newTest(t, e)
2182	te.userAgent = testAppUA
2183	// To avoid error on server side.
2184	te.maxServerSendMsgSize = newInt(5 * 1024 * 1024)
2185	te.maxClientReceiveMsgSize = newInt(1024)
2186	te.maxClientSendMsgSize = newInt(1024)
2187	te.declareLogNoise(
2188		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2189		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2190		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2191		"Failed to dial : context canceled; please retry.",
2192	)
2193	te.startServer(&testServer{security: e.security})
2194
2195	defer te.tearDown()
2196	tc := testpb.NewTestServiceClient(te.clientConn())
2197
2198	const smallSize = 1
2199	const largeSize = 1024
2200	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2201	if err != nil {
2202		t.Fatal(err)
2203	}
2204
2205	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2206	if err != nil {
2207		t.Fatal(err)
2208	}
2209	req := &testpb.SimpleRequest{
2210		ResponseType: testpb.PayloadType_COMPRESSABLE,
2211		ResponseSize: int32(largeSize),
2212		Payload:      smallPayload,
2213	}
2214	// Test for unary RPC recv.
2215	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2216		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2217	}
2218
2219	// Test for unary RPC send.
2220	req.Payload = largePayload
2221	req.ResponseSize = int32(smallSize)
2222	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2223		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2224	}
2225
2226	respParam := []*testpb.ResponseParameters{
2227		{
2228			Size: int32(largeSize),
2229		},
2230	}
2231	sreq := &testpb.StreamingOutputCallRequest{
2232		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2233		ResponseParameters: respParam,
2234		Payload:            smallPayload,
2235	}
2236
2237	// Test for streaming RPC recv.
2238	stream, err := tc.FullDuplexCall(te.ctx)
2239	if err != nil {
2240		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2241	}
2242	if err := stream.Send(sreq); err != nil {
2243		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2244	}
2245	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2246		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2247	}
2248
2249	// Test for streaming RPC send.
2250	respParam[0].Size = int32(smallSize)
2251	sreq.Payload = largePayload
2252	stream, err = tc.FullDuplexCall(te.ctx)
2253	if err != nil {
2254		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2255	}
2256	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
2257		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
2258	}
2259}
2260
2261func (s) TestMaxMsgSizeServerAPI(t *testing.T) {
2262	for _, e := range listTestEnv() {
2263		testMaxMsgSizeServerAPI(t, e)
2264	}
2265}
2266
2267func testMaxMsgSizeServerAPI(t *testing.T, e env) {
2268	te := newTest(t, e)
2269	te.userAgent = testAppUA
2270	te.maxServerReceiveMsgSize = newInt(1024)
2271	te.maxServerSendMsgSize = newInt(1024)
2272	te.declareLogNoise(
2273		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2274		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2275		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2276		"Failed to dial : context canceled; please retry.",
2277	)
2278	te.startServer(&testServer{security: e.security})
2279
2280	defer te.tearDown()
2281	tc := testpb.NewTestServiceClient(te.clientConn())
2282
2283	const smallSize = 1
2284	const largeSize = 1024
2285	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2286	if err != nil {
2287		t.Fatal(err)
2288	}
2289
2290	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2291	if err != nil {
2292		t.Fatal(err)
2293	}
2294	req := &testpb.SimpleRequest{
2295		ResponseType: testpb.PayloadType_COMPRESSABLE,
2296		ResponseSize: int32(largeSize),
2297		Payload:      smallPayload,
2298	}
2299	// Test for unary RPC send.
2300	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2301		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2302	}
2303
2304	// Test for unary RPC recv.
2305	req.Payload = largePayload
2306	req.ResponseSize = int32(smallSize)
2307	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2308		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2309	}
2310
2311	respParam := []*testpb.ResponseParameters{
2312		{
2313			Size: int32(largeSize),
2314		},
2315	}
2316	sreq := &testpb.StreamingOutputCallRequest{
2317		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2318		ResponseParameters: respParam,
2319		Payload:            smallPayload,
2320	}
2321
2322	// Test for streaming RPC send.
2323	stream, err := tc.FullDuplexCall(te.ctx)
2324	if err != nil {
2325		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2326	}
2327	if err := stream.Send(sreq); err != nil {
2328		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2329	}
2330	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2331		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2332	}
2333
2334	// Test for streaming RPC recv.
2335	respParam[0].Size = int32(smallSize)
2336	sreq.Payload = largePayload
2337	stream, err = tc.FullDuplexCall(te.ctx)
2338	if err != nil {
2339		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2340	}
2341	if err := stream.Send(sreq); err != nil {
2342		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2343	}
2344	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2345		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2346	}
2347}
2348
2349func (s) TestTap(t *testing.T) {
2350	for _, e := range listTestEnv() {
2351		if e.name == "handler-tls" {
2352			continue
2353		}
2354		testTap(t, e)
2355	}
2356}
2357
2358type myTap struct {
2359	cnt int
2360}
2361
2362func (t *myTap) handle(ctx context.Context, info *tap.Info) (context.Context, error) {
2363	if info != nil {
2364		if info.FullMethodName == "/grpc.testing.TestService/EmptyCall" {
2365			t.cnt++
2366		} else if info.FullMethodName == "/grpc.testing.TestService/UnaryCall" {
2367			return nil, fmt.Errorf("tap error")
2368		}
2369	}
2370	return ctx, nil
2371}
2372
2373func testTap(t *testing.T, e env) {
2374	te := newTest(t, e)
2375	te.userAgent = testAppUA
2376	ttap := &myTap{}
2377	te.tapHandle = ttap.handle
2378	te.declareLogNoise(
2379		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2380		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2381		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2382	)
2383	te.startServer(&testServer{security: e.security})
2384	defer te.tearDown()
2385
2386	cc := te.clientConn()
2387	tc := testpb.NewTestServiceClient(cc)
2388	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
2389		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2390	}
2391	if ttap.cnt != 1 {
2392		t.Fatalf("Get the count in ttap %d, want 1", ttap.cnt)
2393	}
2394
2395	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 31)
2396	if err != nil {
2397		t.Fatal(err)
2398	}
2399
2400	req := &testpb.SimpleRequest{
2401		ResponseType: testpb.PayloadType_COMPRESSABLE,
2402		ResponseSize: 45,
2403		Payload:      payload,
2404	}
2405	if _, err := tc.UnaryCall(context.Background(), req); status.Code(err) != codes.Unavailable {
2406		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
2407	}
2408}
2409
2410// healthCheck is a helper function to make a unary health check RPC and return
2411// the response.
2412func healthCheck(d time.Duration, cc *grpc.ClientConn, service string) (*healthpb.HealthCheckResponse, error) {
2413	ctx, cancel := context.WithTimeout(context.Background(), d)
2414	defer cancel()
2415	hc := healthgrpc.NewHealthClient(cc)
2416	return hc.Check(ctx, &healthpb.HealthCheckRequest{Service: service})
2417}
2418
2419// verifyHealthCheckStatus is a helper function to verify that the current
2420// health status of the service matches the one passed in 'wantStatus'.
2421func verifyHealthCheckStatus(t *testing.T, d time.Duration, cc *grpc.ClientConn, service string, wantStatus healthpb.HealthCheckResponse_ServingStatus) {
2422	t.Helper()
2423	resp, err := healthCheck(d, cc, service)
2424	if err != nil {
2425		t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2426	}
2427	if resp.Status != wantStatus {
2428		t.Fatalf("Got the serving status %v, want %v", resp.Status, wantStatus)
2429	}
2430}
2431
2432// verifyHealthCheckErrCode is a helper function to verify that a unary health
2433// check RPC returns an error with a code set to 'wantCode'.
2434func verifyHealthCheckErrCode(t *testing.T, d time.Duration, cc *grpc.ClientConn, service string, wantCode codes.Code) {
2435	t.Helper()
2436	if _, err := healthCheck(d, cc, service); status.Code(err) != wantCode {
2437		t.Fatalf("Health/Check() got errCode %v, want %v", status.Code(err), wantCode)
2438	}
2439}
2440
2441// newHealthCheckStream is a helper function to start a health check streaming
2442// RPC, and returns the stream.
2443func newHealthCheckStream(t *testing.T, cc *grpc.ClientConn, service string) (healthgrpc.Health_WatchClient, context.CancelFunc) {
2444	t.Helper()
2445	ctx, cancel := context.WithCancel(context.Background())
2446	hc := healthgrpc.NewHealthClient(cc)
2447	stream, err := hc.Watch(ctx, &healthpb.HealthCheckRequest{Service: service})
2448	if err != nil {
2449		t.Fatalf("hc.Watch(_, %v) failed: %v", service, err)
2450	}
2451	return stream, cancel
2452}
2453
2454// healthWatchChecker is a helper function to verify that the next health
2455// status returned on the given stream matches the one passed in 'wantStatus'.
2456func healthWatchChecker(t *testing.T, stream healthgrpc.Health_WatchClient, wantStatus healthpb.HealthCheckResponse_ServingStatus) {
2457	t.Helper()
2458	response, err := stream.Recv()
2459	if err != nil {
2460		t.Fatalf("stream.Recv() failed: %v", err)
2461	}
2462	if response.Status != wantStatus {
2463		t.Fatalf("got servingStatus %v, want %v", response.Status, wantStatus)
2464	}
2465}
2466
2467// TestHealthCheckSuccess invokes the unary Check() RPC on the health server in
2468// a successful case.
2469func (s) TestHealthCheckSuccess(t *testing.T) {
2470	for _, e := range listTestEnv() {
2471		testHealthCheckSuccess(t, e)
2472	}
2473}
2474
2475func testHealthCheckSuccess(t *testing.T, e env) {
2476	te := newTest(t, e)
2477	te.enableHealthServer = true
2478	te.startServer(&testServer{security: e.security})
2479	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
2480	defer te.tearDown()
2481
2482	verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), defaultHealthService, codes.OK)
2483}
2484
2485// TestHealthCheckFailure invokes the unary Check() RPC on the health server
2486// with an expired context and expects the RPC to fail.
2487func (s) TestHealthCheckFailure(t *testing.T) {
2488	for _, e := range listTestEnv() {
2489		testHealthCheckFailure(t, e)
2490	}
2491}
2492
2493func testHealthCheckFailure(t *testing.T, e env) {
2494	te := newTest(t, e)
2495	te.declareLogNoise(
2496		"Failed to dial ",
2497		"grpc: the client connection is closing; please retry",
2498	)
2499	te.enableHealthServer = true
2500	te.startServer(&testServer{security: e.security})
2501	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
2502	defer te.tearDown()
2503
2504	verifyHealthCheckErrCode(t, 0*time.Second, te.clientConn(), defaultHealthService, codes.DeadlineExceeded)
2505	awaitNewConnLogOutput()
2506}
2507
2508// TestHealthCheckOff makes a unary Check() RPC on the health server where the
2509// health status of the defaultHealthService is not set, and therefore expects
2510// an error code 'codes.NotFound'.
2511func (s) TestHealthCheckOff(t *testing.T) {
2512	for _, e := range listTestEnv() {
2513		// TODO(bradfitz): Temporarily skip this env due to #619.
2514		if e.name == "handler-tls" {
2515			continue
2516		}
2517		testHealthCheckOff(t, e)
2518	}
2519}
2520
2521func testHealthCheckOff(t *testing.T, e env) {
2522	te := newTest(t, e)
2523	te.enableHealthServer = true
2524	te.startServer(&testServer{security: e.security})
2525	defer te.tearDown()
2526
2527	verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), defaultHealthService, codes.NotFound)
2528}
2529
2530// TestHealthWatchMultipleClients makes a streaming Watch() RPC on the health
2531// server with multiple clients and expects the same status on both streams.
2532func (s) TestHealthWatchMultipleClients(t *testing.T) {
2533	for _, e := range listTestEnv() {
2534		testHealthWatchMultipleClients(t, e)
2535	}
2536}
2537
2538func testHealthWatchMultipleClients(t *testing.T, e env) {
2539	te := newTest(t, e)
2540	te.enableHealthServer = true
2541	te.startServer(&testServer{security: e.security})
2542	defer te.tearDown()
2543
2544	cc := te.clientConn()
2545	stream1, cf1 := newHealthCheckStream(t, cc, defaultHealthService)
2546	defer cf1()
2547	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
2548
2549	stream2, cf2 := newHealthCheckStream(t, cc, defaultHealthService)
2550	defer cf2()
2551	healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
2552
2553	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING)
2554	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING)
2555	healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING)
2556}
2557
2558// TestHealthWatchSameStatusmakes a streaming Watch() RPC on the health server
2559// and makes sure that the health status of the server is as expected after
2560// multiple calls to SetServingStatus with the same status.
2561func (s) TestHealthWatchSameStatus(t *testing.T) {
2562	for _, e := range listTestEnv() {
2563		testHealthWatchSameStatus(t, e)
2564	}
2565}
2566
2567func testHealthWatchSameStatus(t *testing.T, e env) {
2568	te := newTest(t, e)
2569	te.enableHealthServer = true
2570	te.startServer(&testServer{security: e.security})
2571	defer te.tearDown()
2572
2573	stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
2574	defer cf()
2575
2576	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
2577	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
2578	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2579	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
2580	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING)
2581	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING)
2582}
2583
2584// TestHealthWatchServiceStatusSetBeforeStartingServer starts a health server
2585// on which the health status for the defaultService is set before the gRPC
2586// server is started, and expects the correct health status to be returned.
2587func (s) TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) {
2588	for _, e := range listTestEnv() {
2589		testHealthWatchSetServiceStatusBeforeStartingServer(t, e)
2590	}
2591}
2592
2593func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) {
2594	hs := health.NewServer()
2595	te := newTest(t, e)
2596	te.healthServer = hs
2597	hs.SetServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
2598	te.startServer(&testServer{security: e.security})
2599	defer te.tearDown()
2600
2601	stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
2602	defer cf()
2603	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2604}
2605
2606// TestHealthWatchDefaultStatusChange verifies the simple case where the
2607// service starts off with a SERVICE_UNKNOWN status (because SetServingStatus
2608// hasn't been called yet) and then moves to SERVING after SetServingStatus is
2609// called.
2610func (s) TestHealthWatchDefaultStatusChange(t *testing.T) {
2611	for _, e := range listTestEnv() {
2612		testHealthWatchDefaultStatusChange(t, e)
2613	}
2614}
2615
2616func testHealthWatchDefaultStatusChange(t *testing.T, e env) {
2617	te := newTest(t, e)
2618	te.enableHealthServer = true
2619	te.startServer(&testServer{security: e.security})
2620	defer te.tearDown()
2621
2622	stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
2623	defer cf()
2624	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
2625	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
2626	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2627}
2628
2629// TestHealthWatchSetServiceStatusBeforeClientCallsWatch verifies the case
2630// where the health status is set to SERVING before the client calls Watch().
2631func (s) TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) {
2632	for _, e := range listTestEnv() {
2633		testHealthWatchSetServiceStatusBeforeClientCallsWatch(t, e)
2634	}
2635}
2636
2637func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) {
2638	te := newTest(t, e)
2639	te.enableHealthServer = true
2640	te.startServer(&testServer{security: e.security})
2641	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
2642	defer te.tearDown()
2643
2644	stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
2645	defer cf()
2646	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2647}
2648
2649// TestHealthWatchOverallServerHealthChange verifies setting the overall status
2650// of the server by using the empty service name.
2651func (s) TestHealthWatchOverallServerHealthChange(t *testing.T) {
2652	for _, e := range listTestEnv() {
2653		testHealthWatchOverallServerHealthChange(t, e)
2654	}
2655}
2656
2657func testHealthWatchOverallServerHealthChange(t *testing.T, e env) {
2658	te := newTest(t, e)
2659	te.enableHealthServer = true
2660	te.startServer(&testServer{security: e.security})
2661	defer te.tearDown()
2662
2663	stream, cf := newHealthCheckStream(t, te.clientConn(), "")
2664	defer cf()
2665	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2666	te.setHealthServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
2667	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING)
2668}
2669
2670// TestUnknownHandler verifies that an expected error is returned (by setting
2671// the unknownHandler on the server) for a service which is not exposed to the
2672// client.
2673func (s) TestUnknownHandler(t *testing.T) {
2674	// An example unknownHandler that returns a different code and a different
2675	// method, making sure that we do not expose what methods are implemented to
2676	// a client that is not authenticated.
2677	unknownHandler := func(srv interface{}, stream grpc.ServerStream) error {
2678		return status.Error(codes.Unauthenticated, "user unauthenticated")
2679	}
2680	for _, e := range listTestEnv() {
2681		// TODO(bradfitz): Temporarily skip this env due to #619.
2682		if e.name == "handler-tls" {
2683			continue
2684		}
2685		testUnknownHandler(t, e, unknownHandler)
2686	}
2687}
2688
2689func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) {
2690	te := newTest(t, e)
2691	te.unknownHandler = unknownHandler
2692	te.startServer(&testServer{security: e.security})
2693	defer te.tearDown()
2694	verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), "", codes.Unauthenticated)
2695}
2696
2697// TestHealthCheckServingStatus makes a streaming Watch() RPC on the health
2698// server and verifies a bunch of health status transitions.
2699func (s) TestHealthCheckServingStatus(t *testing.T) {
2700	for _, e := range listTestEnv() {
2701		testHealthCheckServingStatus(t, e)
2702	}
2703}
2704
2705func testHealthCheckServingStatus(t *testing.T, e env) {
2706	te := newTest(t, e)
2707	te.enableHealthServer = true
2708	te.startServer(&testServer{security: e.security})
2709	defer te.tearDown()
2710
2711	cc := te.clientConn()
2712	verifyHealthCheckStatus(t, 1*time.Second, cc, "", healthpb.HealthCheckResponse_SERVING)
2713	verifyHealthCheckErrCode(t, 1*time.Second, cc, defaultHealthService, codes.NotFound)
2714	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
2715	verifyHealthCheckStatus(t, 1*time.Second, cc, defaultHealthService, healthpb.HealthCheckResponse_SERVING)
2716	te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING)
2717	verifyHealthCheckStatus(t, 1*time.Second, cc, defaultHealthService, healthpb.HealthCheckResponse_NOT_SERVING)
2718}
2719
2720func (s) TestEmptyUnaryWithUserAgent(t *testing.T) {
2721	for _, e := range listTestEnv() {
2722		testEmptyUnaryWithUserAgent(t, e)
2723	}
2724}
2725
2726func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
2727	te := newTest(t, e)
2728	te.userAgent = testAppUA
2729	te.startServer(&testServer{security: e.security})
2730	defer te.tearDown()
2731
2732	cc := te.clientConn()
2733	tc := testpb.NewTestServiceClient(cc)
2734	var header metadata.MD
2735	reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header))
2736	if err != nil || !proto.Equal(&testpb.Empty{}, reply) {
2737		t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{})
2738	}
2739	if v, ok := header["ua"]; !ok || !strings.HasPrefix(v[0], testAppUA) {
2740		t.Fatalf("header[\"ua\"] = %q, %t, want string with prefix %q, true", v, ok, testAppUA)
2741	}
2742
2743	te.srv.Stop()
2744}
2745
2746func (s) TestFailedEmptyUnary(t *testing.T) {
2747	for _, e := range listTestEnv() {
2748		if e.name == "handler-tls" {
2749			// This test covers status details, but
2750			// Grpc-Status-Details-Bin is not support in handler_server.
2751			continue
2752		}
2753		testFailedEmptyUnary(t, e)
2754	}
2755}
2756
2757func testFailedEmptyUnary(t *testing.T, e env) {
2758	te := newTest(t, e)
2759	te.userAgent = failAppUA
2760	te.startServer(&testServer{security: e.security})
2761	defer te.tearDown()
2762	tc := testpb.NewTestServiceClient(te.clientConn())
2763
2764	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2765	wantErr := detailedError
2766	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !testutils.StatusErrEqual(err, wantErr) {
2767		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr)
2768	}
2769}
2770
2771func (s) TestLargeUnary(t *testing.T) {
2772	for _, e := range listTestEnv() {
2773		testLargeUnary(t, e)
2774	}
2775}
2776
2777func testLargeUnary(t *testing.T, e env) {
2778	te := newTest(t, e)
2779	te.startServer(&testServer{security: e.security})
2780	defer te.tearDown()
2781	tc := testpb.NewTestServiceClient(te.clientConn())
2782
2783	const argSize = 271828
2784	const respSize = 314159
2785
2786	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2787	if err != nil {
2788		t.Fatal(err)
2789	}
2790
2791	req := &testpb.SimpleRequest{
2792		ResponseType: testpb.PayloadType_COMPRESSABLE,
2793		ResponseSize: respSize,
2794		Payload:      payload,
2795	}
2796	reply, err := tc.UnaryCall(context.Background(), req)
2797	if err != nil {
2798		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
2799	}
2800	pt := reply.GetPayload().GetType()
2801	ps := len(reply.GetPayload().GetBody())
2802	if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize {
2803		t.Fatalf("Got the reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize)
2804	}
2805}
2806
2807// Test backward-compatibility API for setting msg size limit.
2808func (s) TestExceedMsgLimit(t *testing.T) {
2809	for _, e := range listTestEnv() {
2810		testExceedMsgLimit(t, e)
2811	}
2812}
2813
2814func testExceedMsgLimit(t *testing.T, e env) {
2815	te := newTest(t, e)
2816	maxMsgSize := 1024
2817	te.maxServerMsgSize, te.maxClientMsgSize = newInt(maxMsgSize), newInt(maxMsgSize)
2818	te.startServer(&testServer{security: e.security})
2819	defer te.tearDown()
2820	tc := testpb.NewTestServiceClient(te.clientConn())
2821
2822	largeSize := int32(maxMsgSize + 1)
2823	const smallSize = 1
2824
2825	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2826	if err != nil {
2827		t.Fatal(err)
2828	}
2829	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2830	if err != nil {
2831		t.Fatal(err)
2832	}
2833
2834	// Make sure the server cannot receive a unary RPC of largeSize.
2835	req := &testpb.SimpleRequest{
2836		ResponseType: testpb.PayloadType_COMPRESSABLE,
2837		ResponseSize: smallSize,
2838		Payload:      largePayload,
2839	}
2840	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2841		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2842	}
2843	// Make sure the client cannot receive a unary RPC of largeSize.
2844	req.ResponseSize = largeSize
2845	req.Payload = smallPayload
2846	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2847		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2848	}
2849
2850	// Make sure the server cannot receive a streaming RPC of largeSize.
2851	stream, err := tc.FullDuplexCall(te.ctx)
2852	if err != nil {
2853		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2854	}
2855	respParam := []*testpb.ResponseParameters{
2856		{
2857			Size: 1,
2858		},
2859	}
2860
2861	sreq := &testpb.StreamingOutputCallRequest{
2862		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2863		ResponseParameters: respParam,
2864		Payload:            largePayload,
2865	}
2866	if err := stream.Send(sreq); err != nil {
2867		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2868	}
2869	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2870		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2871	}
2872
2873	// Test on client side for streaming RPC.
2874	stream, err = tc.FullDuplexCall(te.ctx)
2875	if err != nil {
2876		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2877	}
2878	respParam[0].Size = largeSize
2879	sreq.Payload = smallPayload
2880	if err := stream.Send(sreq); err != nil {
2881		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2882	}
2883	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2884		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2885	}
2886}
2887
2888func (s) TestPeerClientSide(t *testing.T) {
2889	for _, e := range listTestEnv() {
2890		testPeerClientSide(t, e)
2891	}
2892}
2893
2894func testPeerClientSide(t *testing.T, e env) {
2895	te := newTest(t, e)
2896	te.userAgent = testAppUA
2897	te.startServer(&testServer{security: e.security})
2898	defer te.tearDown()
2899	tc := testpb.NewTestServiceClient(te.clientConn())
2900	peer := new(peer.Peer)
2901	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
2902		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2903	}
2904	pa := peer.Addr.String()
2905	if e.network == "unix" {
2906		if pa != te.srvAddr {
2907			t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
2908		}
2909		return
2910	}
2911	_, pp, err := net.SplitHostPort(pa)
2912	if err != nil {
2913		t.Fatalf("Failed to parse address from peer.")
2914	}
2915	_, sp, err := net.SplitHostPort(te.srvAddr)
2916	if err != nil {
2917		t.Fatalf("Failed to parse address of test server.")
2918	}
2919	if pp != sp {
2920		t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
2921	}
2922}
2923
2924// TestPeerNegative tests that if call fails setting peer
2925// doesn't cause a segmentation fault.
2926// issue#1141 https://github.com/grpc/grpc-go/issues/1141
2927func (s) TestPeerNegative(t *testing.T) {
2928	for _, e := range listTestEnv() {
2929		testPeerNegative(t, e)
2930	}
2931}
2932
2933func testPeerNegative(t *testing.T, e env) {
2934	te := newTest(t, e)
2935	te.startServer(&testServer{security: e.security})
2936	defer te.tearDown()
2937
2938	cc := te.clientConn()
2939	tc := testpb.NewTestServiceClient(cc)
2940	peer := new(peer.Peer)
2941	ctx, cancel := context.WithCancel(context.Background())
2942	cancel()
2943	tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer))
2944}
2945
2946func (s) TestPeerFailedRPC(t *testing.T) {
2947	for _, e := range listTestEnv() {
2948		testPeerFailedRPC(t, e)
2949	}
2950}
2951
2952func testPeerFailedRPC(t *testing.T, e env) {
2953	te := newTest(t, e)
2954	te.maxServerReceiveMsgSize = newInt(1 * 1024)
2955	te.startServer(&testServer{security: e.security})
2956
2957	defer te.tearDown()
2958	tc := testpb.NewTestServiceClient(te.clientConn())
2959
2960	// first make a successful request to the server
2961	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
2962		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2963	}
2964
2965	// make a second request that will be rejected by the server
2966	const largeSize = 5 * 1024
2967	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2968	if err != nil {
2969		t.Fatal(err)
2970	}
2971	req := &testpb.SimpleRequest{
2972		ResponseType: testpb.PayloadType_COMPRESSABLE,
2973		Payload:      largePayload,
2974	}
2975
2976	peer := new(peer.Peer)
2977	if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || status.Code(err) != codes.ResourceExhausted {
2978		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2979	} else {
2980		pa := peer.Addr.String()
2981		if e.network == "unix" {
2982			if pa != te.srvAddr {
2983				t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
2984			}
2985			return
2986		}
2987		_, pp, err := net.SplitHostPort(pa)
2988		if err != nil {
2989			t.Fatalf("Failed to parse address from peer.")
2990		}
2991		_, sp, err := net.SplitHostPort(te.srvAddr)
2992		if err != nil {
2993			t.Fatalf("Failed to parse address of test server.")
2994		}
2995		if pp != sp {
2996			t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
2997		}
2998	}
2999}
3000
3001func (s) TestMetadataUnaryRPC(t *testing.T) {
3002	for _, e := range listTestEnv() {
3003		testMetadataUnaryRPC(t, e)
3004	}
3005}
3006
3007func testMetadataUnaryRPC(t *testing.T, e env) {
3008	te := newTest(t, e)
3009	te.startServer(&testServer{security: e.security})
3010	defer te.tearDown()
3011	tc := testpb.NewTestServiceClient(te.clientConn())
3012
3013	const argSize = 2718
3014	const respSize = 314
3015
3016	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3017	if err != nil {
3018		t.Fatal(err)
3019	}
3020
3021	req := &testpb.SimpleRequest{
3022		ResponseType: testpb.PayloadType_COMPRESSABLE,
3023		ResponseSize: respSize,
3024		Payload:      payload,
3025	}
3026	var header, trailer metadata.MD
3027	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3028	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.Trailer(&trailer)); err != nil {
3029		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
3030	}
3031	// Ignore optional response headers that Servers may set:
3032	if header != nil {
3033		delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers
3034		delete(header, "date")    // the Date header is also optional
3035		delete(header, "user-agent")
3036		delete(header, "content-type")
3037	}
3038	if !reflect.DeepEqual(header, testMetadata) {
3039		t.Fatalf("Received header metadata %v, want %v", header, testMetadata)
3040	}
3041	if !reflect.DeepEqual(trailer, testTrailerMetadata) {
3042		t.Fatalf("Received trailer metadata %v, want %v", trailer, testTrailerMetadata)
3043	}
3044}
3045
3046func (s) TestMetadataOrderUnaryRPC(t *testing.T) {
3047	for _, e := range listTestEnv() {
3048		testMetadataOrderUnaryRPC(t, e)
3049	}
3050}
3051
3052func testMetadataOrderUnaryRPC(t *testing.T, e env) {
3053	te := newTest(t, e)
3054	te.startServer(&testServer{security: e.security})
3055	defer te.tearDown()
3056	tc := testpb.NewTestServiceClient(te.clientConn())
3057
3058	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3059	ctx = metadata.AppendToOutgoingContext(ctx, "key1", "value2")
3060	ctx = metadata.AppendToOutgoingContext(ctx, "key1", "value3")
3061
3062	// using Join to built expected metadata instead of FromOutgoingContext
3063	newMetadata := metadata.Join(testMetadata, metadata.Pairs("key1", "value2", "key1", "value3"))
3064
3065	var header metadata.MD
3066	if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}, grpc.Header(&header)); err != nil {
3067		t.Fatal(err)
3068	}
3069
3070	// Ignore optional response headers that Servers may set:
3071	if header != nil {
3072		delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers
3073		delete(header, "date")    // the Date header is also optional
3074		delete(header, "user-agent")
3075		delete(header, "content-type")
3076	}
3077
3078	if !reflect.DeepEqual(header, newMetadata) {
3079		t.Fatalf("Received header metadata %v, want %v", header, newMetadata)
3080	}
3081}
3082
3083func (s) TestMultipleSetTrailerUnaryRPC(t *testing.T) {
3084	for _, e := range listTestEnv() {
3085		testMultipleSetTrailerUnaryRPC(t, e)
3086	}
3087}
3088
3089func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) {
3090	te := newTest(t, e)
3091	te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
3092	defer te.tearDown()
3093	tc := testpb.NewTestServiceClient(te.clientConn())
3094
3095	const (
3096		argSize  = 1
3097		respSize = 1
3098	)
3099	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3100	if err != nil {
3101		t.Fatal(err)
3102	}
3103
3104	req := &testpb.SimpleRequest{
3105		ResponseType: testpb.PayloadType_COMPRESSABLE,
3106		ResponseSize: respSize,
3107		Payload:      payload,
3108	}
3109	var trailer metadata.MD
3110	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3111	if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.WaitForReady(true)); err != nil {
3112		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
3113	}
3114	expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
3115	if !reflect.DeepEqual(trailer, expectedTrailer) {
3116		t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
3117	}
3118}
3119
3120func (s) TestMultipleSetTrailerStreamingRPC(t *testing.T) {
3121	for _, e := range listTestEnv() {
3122		testMultipleSetTrailerStreamingRPC(t, e)
3123	}
3124}
3125
3126func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) {
3127	te := newTest(t, e)
3128	te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
3129	defer te.tearDown()
3130	tc := testpb.NewTestServiceClient(te.clientConn())
3131
3132	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3133	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
3134	if err != nil {
3135		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3136	}
3137	if err := stream.CloseSend(); err != nil {
3138		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3139	}
3140	if _, err := stream.Recv(); err != io.EOF {
3141		t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
3142	}
3143
3144	trailer := stream.Trailer()
3145	expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
3146	if !reflect.DeepEqual(trailer, expectedTrailer) {
3147		t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
3148	}
3149}
3150
3151func (s) TestSetAndSendHeaderUnaryRPC(t *testing.T) {
3152	for _, e := range listTestEnv() {
3153		if e.name == "handler-tls" {
3154			continue
3155		}
3156		testSetAndSendHeaderUnaryRPC(t, e)
3157	}
3158}
3159
3160// To test header metadata is sent on SendHeader().
3161func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) {
3162	te := newTest(t, e)
3163	te.startServer(&testServer{security: e.security, setAndSendHeader: true})
3164	defer te.tearDown()
3165	tc := testpb.NewTestServiceClient(te.clientConn())
3166
3167	const (
3168		argSize  = 1
3169		respSize = 1
3170	)
3171	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3172	if err != nil {
3173		t.Fatal(err)
3174	}
3175
3176	req := &testpb.SimpleRequest{
3177		ResponseType: testpb.PayloadType_COMPRESSABLE,
3178		ResponseSize: respSize,
3179		Payload:      payload,
3180	}
3181	var header metadata.MD
3182	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3183	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil {
3184		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
3185	}
3186	delete(header, "user-agent")
3187	delete(header, "content-type")
3188	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3189	if !reflect.DeepEqual(header, expectedHeader) {
3190		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3191	}
3192}
3193
3194func (s) TestMultipleSetHeaderUnaryRPC(t *testing.T) {
3195	for _, e := range listTestEnv() {
3196		if e.name == "handler-tls" {
3197			continue
3198		}
3199		testMultipleSetHeaderUnaryRPC(t, e)
3200	}
3201}
3202
3203// To test header metadata is sent when sending response.
3204func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) {
3205	te := newTest(t, e)
3206	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
3207	defer te.tearDown()
3208	tc := testpb.NewTestServiceClient(te.clientConn())
3209
3210	const (
3211		argSize  = 1
3212		respSize = 1
3213	)
3214	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3215	if err != nil {
3216		t.Fatal(err)
3217	}
3218
3219	req := &testpb.SimpleRequest{
3220		ResponseType: testpb.PayloadType_COMPRESSABLE,
3221		ResponseSize: respSize,
3222		Payload:      payload,
3223	}
3224
3225	var header metadata.MD
3226	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3227	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil {
3228		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
3229	}
3230	delete(header, "user-agent")
3231	delete(header, "content-type")
3232	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3233	if !reflect.DeepEqual(header, expectedHeader) {
3234		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3235	}
3236}
3237
3238func (s) TestMultipleSetHeaderUnaryRPCError(t *testing.T) {
3239	for _, e := range listTestEnv() {
3240		if e.name == "handler-tls" {
3241			continue
3242		}
3243		testMultipleSetHeaderUnaryRPCError(t, e)
3244	}
3245}
3246
3247// To test header metadata is sent when sending status.
3248func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) {
3249	te := newTest(t, e)
3250	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
3251	defer te.tearDown()
3252	tc := testpb.NewTestServiceClient(te.clientConn())
3253
3254	const (
3255		argSize  = 1
3256		respSize = -1 // Invalid respSize to make RPC fail.
3257	)
3258	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3259	if err != nil {
3260		t.Fatal(err)
3261	}
3262
3263	req := &testpb.SimpleRequest{
3264		ResponseType: testpb.PayloadType_COMPRESSABLE,
3265		ResponseSize: respSize,
3266		Payload:      payload,
3267	}
3268	var header metadata.MD
3269	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3270	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err == nil {
3271		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err)
3272	}
3273	delete(header, "user-agent")
3274	delete(header, "content-type")
3275	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3276	if !reflect.DeepEqual(header, expectedHeader) {
3277		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3278	}
3279}
3280
3281func (s) TestSetAndSendHeaderStreamingRPC(t *testing.T) {
3282	for _, e := range listTestEnv() {
3283		if e.name == "handler-tls" {
3284			continue
3285		}
3286		testSetAndSendHeaderStreamingRPC(t, e)
3287	}
3288}
3289
3290// To test header metadata is sent on SendHeader().
3291func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) {
3292	te := newTest(t, e)
3293	te.startServer(&testServer{security: e.security, setAndSendHeader: true})
3294	defer te.tearDown()
3295	tc := testpb.NewTestServiceClient(te.clientConn())
3296
3297	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3298	stream, err := tc.FullDuplexCall(ctx)
3299	if err != nil {
3300		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3301	}
3302	if err := stream.CloseSend(); err != nil {
3303		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3304	}
3305	if _, err := stream.Recv(); err != io.EOF {
3306		t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
3307	}
3308
3309	header, err := stream.Header()
3310	if err != nil {
3311		t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
3312	}
3313	delete(header, "user-agent")
3314	delete(header, "content-type")
3315	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3316	if !reflect.DeepEqual(header, expectedHeader) {
3317		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3318	}
3319}
3320
3321func (s) TestMultipleSetHeaderStreamingRPC(t *testing.T) {
3322	for _, e := range listTestEnv() {
3323		if e.name == "handler-tls" {
3324			continue
3325		}
3326		testMultipleSetHeaderStreamingRPC(t, e)
3327	}
3328}
3329
3330// To test header metadata is sent when sending response.
3331func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) {
3332	te := newTest(t, e)
3333	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
3334	defer te.tearDown()
3335	tc := testpb.NewTestServiceClient(te.clientConn())
3336
3337	const (
3338		argSize  = 1
3339		respSize = 1
3340	)
3341	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3342	stream, err := tc.FullDuplexCall(ctx)
3343	if err != nil {
3344		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3345	}
3346
3347	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3348	if err != nil {
3349		t.Fatal(err)
3350	}
3351
3352	req := &testpb.StreamingOutputCallRequest{
3353		ResponseType: testpb.PayloadType_COMPRESSABLE,
3354		ResponseParameters: []*testpb.ResponseParameters{
3355			{Size: respSize},
3356		},
3357		Payload: payload,
3358	}
3359	if err := stream.Send(req); err != nil {
3360		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3361	}
3362	if _, err := stream.Recv(); err != nil {
3363		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
3364	}
3365	if err := stream.CloseSend(); err != nil {
3366		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3367	}
3368	if _, err := stream.Recv(); err != io.EOF {
3369		t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
3370	}
3371
3372	header, err := stream.Header()
3373	if err != nil {
3374		t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
3375	}
3376	delete(header, "user-agent")
3377	delete(header, "content-type")
3378	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3379	if !reflect.DeepEqual(header, expectedHeader) {
3380		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3381	}
3382
3383}
3384
3385func (s) TestMultipleSetHeaderStreamingRPCError(t *testing.T) {
3386	for _, e := range listTestEnv() {
3387		if e.name == "handler-tls" {
3388			continue
3389		}
3390		testMultipleSetHeaderStreamingRPCError(t, e)
3391	}
3392}
3393
3394// To test header metadata is sent when sending status.
3395func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
3396	te := newTest(t, e)
3397	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
3398	defer te.tearDown()
3399	tc := testpb.NewTestServiceClient(te.clientConn())
3400
3401	const (
3402		argSize  = 1
3403		respSize = -1
3404	)
3405	ctx, cancel := context.WithCancel(context.Background())
3406	defer cancel()
3407	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
3408	stream, err := tc.FullDuplexCall(ctx)
3409	if err != nil {
3410		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3411	}
3412
3413	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3414	if err != nil {
3415		t.Fatal(err)
3416	}
3417
3418	req := &testpb.StreamingOutputCallRequest{
3419		ResponseType: testpb.PayloadType_COMPRESSABLE,
3420		ResponseParameters: []*testpb.ResponseParameters{
3421			{Size: respSize},
3422		},
3423		Payload: payload,
3424	}
3425	if err := stream.Send(req); err != nil {
3426		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3427	}
3428	if _, err := stream.Recv(); err == nil {
3429		t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
3430	}
3431
3432	header, err := stream.Header()
3433	if err != nil {
3434		t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
3435	}
3436	delete(header, "user-agent")
3437	delete(header, "content-type")
3438	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3439	if !reflect.DeepEqual(header, expectedHeader) {
3440		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3441	}
3442	if err := stream.CloseSend(); err != nil {
3443		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3444	}
3445}
3446
3447// TestMalformedHTTP2Metadata verfies the returned error when the client
3448// sends an illegal metadata.
3449func (s) TestMalformedHTTP2Metadata(t *testing.T) {
3450	for _, e := range listTestEnv() {
3451		if e.name == "handler-tls" {
3452			// Failed with "server stops accepting new RPCs".
3453			// Server stops accepting new RPCs when the client sends an illegal http2 header.
3454			continue
3455		}
3456		testMalformedHTTP2Metadata(t, e)
3457	}
3458}
3459
3460func testMalformedHTTP2Metadata(t *testing.T, e env) {
3461	te := newTest(t, e)
3462	te.startServer(&testServer{security: e.security})
3463	defer te.tearDown()
3464	tc := testpb.NewTestServiceClient(te.clientConn())
3465
3466	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 2718)
3467	if err != nil {
3468		t.Fatal(err)
3469	}
3470
3471	req := &testpb.SimpleRequest{
3472		ResponseType: testpb.PayloadType_COMPRESSABLE,
3473		ResponseSize: 314,
3474		Payload:      payload,
3475	}
3476	ctx := metadata.NewOutgoingContext(context.Background(), malformedHTTP2Metadata)
3477	if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Internal {
3478		t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal)
3479	}
3480}
3481
3482func (s) TestTransparentRetry(t *testing.T) {
3483	for _, e := range listTestEnv() {
3484		if e.name == "handler-tls" {
3485			// Fails with RST_STREAM / FLOW_CONTROL_ERROR
3486			continue
3487		}
3488		testTransparentRetry(t, e)
3489	}
3490}
3491
3492// This test makes sure RPCs are retried times when they receive a RST_STREAM
3493// with the REFUSED_STREAM error code, which the InTapHandle provokes.
3494func testTransparentRetry(t *testing.T, e env) {
3495	te := newTest(t, e)
3496	attempts := 0
3497	successAttempt := 2
3498	te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) {
3499		attempts++
3500		if attempts < successAttempt {
3501			return nil, errors.New("not now")
3502		}
3503		return ctx, nil
3504	}
3505	te.startServer(&testServer{security: e.security})
3506	defer te.tearDown()
3507
3508	cc := te.clientConn()
3509	tsc := testpb.NewTestServiceClient(cc)
3510	testCases := []struct {
3511		successAttempt int
3512		failFast       bool
3513		errCode        codes.Code
3514	}{{
3515		successAttempt: 1,
3516	}, {
3517		successAttempt: 2,
3518	}, {
3519		successAttempt: 3,
3520		errCode:        codes.Unavailable,
3521	}, {
3522		successAttempt: 1,
3523		failFast:       true,
3524	}, {
3525		successAttempt: 2,
3526		failFast:       true,
3527	}, {
3528		successAttempt: 3,
3529		failFast:       true,
3530		errCode:        codes.Unavailable,
3531	}}
3532	for _, tc := range testCases {
3533		attempts = 0
3534		successAttempt = tc.successAttempt
3535
3536		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
3537		_, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!tc.failFast))
3538		cancel()
3539		if status.Code(err) != tc.errCode {
3540			t.Errorf("%+v: tsc.EmptyCall(_, _) = _, %v, want _, Code=%v", tc, err, tc.errCode)
3541		}
3542	}
3543}
3544
3545func (s) TestCancel(t *testing.T) {
3546	for _, e := range listTestEnv() {
3547		testCancel(t, e)
3548	}
3549}
3550
3551func testCancel(t *testing.T, e env) {
3552	te := newTest(t, e)
3553	te.declareLogNoise("grpc: the client connection is closing; please retry")
3554	te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second})
3555	defer te.tearDown()
3556
3557	cc := te.clientConn()
3558	tc := testpb.NewTestServiceClient(cc)
3559
3560	const argSize = 2718
3561	const respSize = 314
3562
3563	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3564	if err != nil {
3565		t.Fatal(err)
3566	}
3567
3568	req := &testpb.SimpleRequest{
3569		ResponseType: testpb.PayloadType_COMPRESSABLE,
3570		ResponseSize: respSize,
3571		Payload:      payload,
3572	}
3573	ctx, cancel := context.WithCancel(context.Background())
3574	time.AfterFunc(1*time.Millisecond, cancel)
3575	if r, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Canceled {
3576		t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled)
3577	}
3578	awaitNewConnLogOutput()
3579}
3580
3581func (s) TestCancelNoIO(t *testing.T) {
3582	for _, e := range listTestEnv() {
3583		testCancelNoIO(t, e)
3584	}
3585}
3586
3587func testCancelNoIO(t *testing.T, e env) {
3588	te := newTest(t, e)
3589	te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken")
3590	te.maxStream = 1 // Only allows 1 live stream per server transport.
3591	te.startServer(&testServer{security: e.security})
3592	defer te.tearDown()
3593
3594	cc := te.clientConn()
3595	tc := testpb.NewTestServiceClient(cc)
3596
3597	// Start one blocked RPC for which we'll never send streaming
3598	// input. This will consume the 1 maximum concurrent streams,
3599	// causing future RPCs to hang.
3600	ctx, cancelFirst := context.WithCancel(context.Background())
3601	_, err := tc.StreamingInputCall(ctx)
3602	if err != nil {
3603		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3604	}
3605
3606	// Loop until the ClientConn receives the initial settings
3607	// frame from the server, notifying it about the maximum
3608	// concurrent streams. We know when it's received it because
3609	// an RPC will fail with codes.DeadlineExceeded instead of
3610	// succeeding.
3611	// TODO(bradfitz): add internal test hook for this (Issue 534)
3612	for {
3613		ctx, cancelSecond := context.WithTimeout(context.Background(), 50*time.Millisecond)
3614		_, err := tc.StreamingInputCall(ctx)
3615		cancelSecond()
3616		if err == nil {
3617			continue
3618		}
3619		if status.Code(err) == codes.DeadlineExceeded {
3620			break
3621		}
3622		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
3623	}
3624	// If there are any RPCs in flight before the client receives
3625	// the max streams setting, let them be expired.
3626	// TODO(bradfitz): add internal test hook for this (Issue 534)
3627	time.Sleep(50 * time.Millisecond)
3628
3629	go func() {
3630		time.Sleep(50 * time.Millisecond)
3631		cancelFirst()
3632	}()
3633
3634	// This should be blocked until the 1st is canceled, then succeed.
3635	ctx, cancelThird := context.WithTimeout(context.Background(), 500*time.Millisecond)
3636	if _, err := tc.StreamingInputCall(ctx); err != nil {
3637		t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3638	}
3639	cancelThird()
3640}
3641
3642// The following tests the gRPC streaming RPC implementations.
3643// TODO(zhaoq): Have better coverage on error cases.
3644var (
3645	reqSizes  = []int{27182, 8, 1828, 45904}
3646	respSizes = []int{31415, 9, 2653, 58979}
3647)
3648
3649func (s) TestNoService(t *testing.T) {
3650	for _, e := range listTestEnv() {
3651		testNoService(t, e)
3652	}
3653}
3654
3655func testNoService(t *testing.T, e env) {
3656	te := newTest(t, e)
3657	te.startServer(nil)
3658	defer te.tearDown()
3659
3660	cc := te.clientConn()
3661	tc := testpb.NewTestServiceClient(cc)
3662
3663	stream, err := tc.FullDuplexCall(te.ctx, grpc.WaitForReady(true))
3664	if err != nil {
3665		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3666	}
3667	if _, err := stream.Recv(); status.Code(err) != codes.Unimplemented {
3668		t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented)
3669	}
3670}
3671
3672func (s) TestPingPong(t *testing.T) {
3673	for _, e := range listTestEnv() {
3674		testPingPong(t, e)
3675	}
3676}
3677
3678func testPingPong(t *testing.T, e env) {
3679	te := newTest(t, e)
3680	te.startServer(&testServer{security: e.security})
3681	defer te.tearDown()
3682	tc := testpb.NewTestServiceClient(te.clientConn())
3683
3684	stream, err := tc.FullDuplexCall(te.ctx)
3685	if err != nil {
3686		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3687	}
3688	var index int
3689	for index < len(reqSizes) {
3690		respParam := []*testpb.ResponseParameters{
3691			{
3692				Size: int32(respSizes[index]),
3693			},
3694		}
3695
3696		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
3697		if err != nil {
3698			t.Fatal(err)
3699		}
3700
3701		req := &testpb.StreamingOutputCallRequest{
3702			ResponseType:       testpb.PayloadType_COMPRESSABLE,
3703			ResponseParameters: respParam,
3704			Payload:            payload,
3705		}
3706		if err := stream.Send(req); err != nil {
3707			t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3708		}
3709		reply, err := stream.Recv()
3710		if err != nil {
3711			t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
3712		}
3713		pt := reply.GetPayload().GetType()
3714		if pt != testpb.PayloadType_COMPRESSABLE {
3715			t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
3716		}
3717		size := len(reply.GetPayload().GetBody())
3718		if size != int(respSizes[index]) {
3719			t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
3720		}
3721		index++
3722	}
3723	if err := stream.CloseSend(); err != nil {
3724		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3725	}
3726	if _, err := stream.Recv(); err != io.EOF {
3727		t.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
3728	}
3729}
3730
3731func (s) TestMetadataStreamingRPC(t *testing.T) {
3732	for _, e := range listTestEnv() {
3733		testMetadataStreamingRPC(t, e)
3734	}
3735}
3736
3737func testMetadataStreamingRPC(t *testing.T, e env) {
3738	te := newTest(t, e)
3739	te.startServer(&testServer{security: e.security})
3740	defer te.tearDown()
3741	tc := testpb.NewTestServiceClient(te.clientConn())
3742
3743	ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
3744	stream, err := tc.FullDuplexCall(ctx)
3745	if err != nil {
3746		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3747	}
3748	go func() {
3749		headerMD, err := stream.Header()
3750		if e.security == "tls" {
3751			delete(headerMD, "transport_security_type")
3752		}
3753		delete(headerMD, "trailer") // ignore if present
3754		delete(headerMD, "user-agent")
3755		delete(headerMD, "content-type")
3756		if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
3757			t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
3758		}
3759		// test the cached value.
3760		headerMD, err = stream.Header()
3761		delete(headerMD, "trailer") // ignore if present
3762		delete(headerMD, "user-agent")
3763		delete(headerMD, "content-type")
3764		if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
3765			t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
3766		}
3767		err = func() error {
3768			for index := 0; index < len(reqSizes); index++ {
3769				respParam := []*testpb.ResponseParameters{
3770					{
3771						Size: int32(respSizes[index]),
3772					},
3773				}
3774
3775				payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
3776				if err != nil {
3777					return err
3778				}
3779
3780				req := &testpb.StreamingOutputCallRequest{
3781					ResponseType:       testpb.PayloadType_COMPRESSABLE,
3782					ResponseParameters: respParam,
3783					Payload:            payload,
3784				}
3785				if err := stream.Send(req); err != nil {
3786					return fmt.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3787				}
3788			}
3789			return nil
3790		}()
3791		// Tell the server we're done sending args.
3792		stream.CloseSend()
3793		if err != nil {
3794			t.Error(err)
3795		}
3796	}()
3797	for {
3798		if _, err := stream.Recv(); err != nil {
3799			break
3800		}
3801	}
3802	trailerMD := stream.Trailer()
3803	if !reflect.DeepEqual(testTrailerMetadata, trailerMD) {
3804		t.Fatalf("%v.Trailer() = %v, want %v", stream, trailerMD, testTrailerMetadata)
3805	}
3806}
3807
3808func (s) TestServerStreaming(t *testing.T) {
3809	for _, e := range listTestEnv() {
3810		testServerStreaming(t, e)
3811	}
3812}
3813
3814func testServerStreaming(t *testing.T, e env) {
3815	te := newTest(t, e)
3816	te.startServer(&testServer{security: e.security})
3817	defer te.tearDown()
3818	tc := testpb.NewTestServiceClient(te.clientConn())
3819
3820	respParam := make([]*testpb.ResponseParameters, len(respSizes))
3821	for i, s := range respSizes {
3822		respParam[i] = &testpb.ResponseParameters{
3823			Size: int32(s),
3824		}
3825	}
3826	req := &testpb.StreamingOutputCallRequest{
3827		ResponseType:       testpb.PayloadType_COMPRESSABLE,
3828		ResponseParameters: respParam,
3829	}
3830	stream, err := tc.StreamingOutputCall(context.Background(), req)
3831	if err != nil {
3832		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3833	}
3834	var rpcStatus error
3835	var respCnt int
3836	var index int
3837	for {
3838		reply, err := stream.Recv()
3839		if err != nil {
3840			rpcStatus = err
3841			break
3842		}
3843		pt := reply.GetPayload().GetType()
3844		if pt != testpb.PayloadType_COMPRESSABLE {
3845			t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
3846		}
3847		size := len(reply.GetPayload().GetBody())
3848		if size != int(respSizes[index]) {
3849			t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
3850		}
3851		index++
3852		respCnt++
3853	}
3854	if rpcStatus != io.EOF {
3855		t.Fatalf("Failed to finish the server streaming rpc: %v, want <EOF>", rpcStatus)
3856	}
3857	if respCnt != len(respSizes) {
3858		t.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
3859	}
3860}
3861
3862func (s) TestFailedServerStreaming(t *testing.T) {
3863	for _, e := range listTestEnv() {
3864		testFailedServerStreaming(t, e)
3865	}
3866}
3867
3868func testFailedServerStreaming(t *testing.T, e env) {
3869	te := newTest(t, e)
3870	te.userAgent = failAppUA
3871	te.startServer(&testServer{security: e.security})
3872	defer te.tearDown()
3873	tc := testpb.NewTestServiceClient(te.clientConn())
3874
3875	respParam := make([]*testpb.ResponseParameters, len(respSizes))
3876	for i, s := range respSizes {
3877		respParam[i] = &testpb.ResponseParameters{
3878			Size: int32(s),
3879		}
3880	}
3881	req := &testpb.StreamingOutputCallRequest{
3882		ResponseType:       testpb.PayloadType_COMPRESSABLE,
3883		ResponseParameters: respParam,
3884	}
3885	ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
3886	stream, err := tc.StreamingOutputCall(ctx, req)
3887	if err != nil {
3888		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3889	}
3890	wantErr := status.Error(codes.DataLoss, "error for testing: "+failAppUA)
3891	if _, err := stream.Recv(); !equalError(err, wantErr) {
3892		t.Fatalf("%v.Recv() = _, %v, want _, %v", stream, err, wantErr)
3893	}
3894}
3895
3896func equalError(x, y error) bool {
3897	return x == y || (x != nil && y != nil && x.Error() == y.Error())
3898}
3899
3900// concurrentSendServer is a TestServiceServer whose
3901// StreamingOutputCall makes ten serial Send calls, sending payloads
3902// "0".."9", inclusive.  TestServerStreamingConcurrent verifies they
3903// were received in the correct order, and that there were no races.
3904//
3905// All other TestServiceServer methods crash if called.
3906type concurrentSendServer struct {
3907	testpb.TestServiceServer
3908}
3909
3910func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
3911	for i := 0; i < 10; i++ {
3912		stream.Send(&testpb.StreamingOutputCallResponse{
3913			Payload: &testpb.Payload{
3914				Body: []byte{'0' + uint8(i)},
3915			},
3916		})
3917	}
3918	return nil
3919}
3920
3921// Tests doing a bunch of concurrent streaming output calls.
3922func (s) TestServerStreamingConcurrent(t *testing.T) {
3923	for _, e := range listTestEnv() {
3924		testServerStreamingConcurrent(t, e)
3925	}
3926}
3927
3928func testServerStreamingConcurrent(t *testing.T, e env) {
3929	te := newTest(t, e)
3930	te.startServer(concurrentSendServer{})
3931	defer te.tearDown()
3932
3933	cc := te.clientConn()
3934	tc := testpb.NewTestServiceClient(cc)
3935
3936	doStreamingCall := func() {
3937		req := &testpb.StreamingOutputCallRequest{}
3938		stream, err := tc.StreamingOutputCall(context.Background(), req)
3939		if err != nil {
3940			t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3941			return
3942		}
3943		var ngot int
3944		var buf bytes.Buffer
3945		for {
3946			reply, err := stream.Recv()
3947			if err == io.EOF {
3948				break
3949			}
3950			if err != nil {
3951				t.Fatal(err)
3952			}
3953			ngot++
3954			if buf.Len() > 0 {
3955				buf.WriteByte(',')
3956			}
3957			buf.Write(reply.GetPayload().GetBody())
3958		}
3959		if want := 10; ngot != want {
3960			t.Errorf("Got %d replies, want %d", ngot, want)
3961		}
3962		if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want {
3963			t.Errorf("Got replies %q; want %q", got, want)
3964		}
3965	}
3966
3967	var wg sync.WaitGroup
3968	for i := 0; i < 20; i++ {
3969		wg.Add(1)
3970		go func() {
3971			defer wg.Done()
3972			doStreamingCall()
3973		}()
3974	}
3975	wg.Wait()
3976
3977}
3978
3979func generatePayloadSizes() [][]int {
3980	reqSizes := [][]int{
3981		{27182, 8, 1828, 45904},
3982	}
3983
3984	num8KPayloads := 1024
3985	eightKPayloads := []int{}
3986	for i := 0; i < num8KPayloads; i++ {
3987		eightKPayloads = append(eightKPayloads, (1 << 13))
3988	}
3989	reqSizes = append(reqSizes, eightKPayloads)
3990
3991	num2MPayloads := 8
3992	twoMPayloads := []int{}
3993	for i := 0; i < num2MPayloads; i++ {
3994		twoMPayloads = append(twoMPayloads, (1 << 21))
3995	}
3996	reqSizes = append(reqSizes, twoMPayloads)
3997
3998	return reqSizes
3999}
4000
4001func (s) TestClientStreaming(t *testing.T) {
4002	for _, s := range generatePayloadSizes() {
4003		for _, e := range listTestEnv() {
4004			testClientStreaming(t, e, s)
4005		}
4006	}
4007}
4008
4009func testClientStreaming(t *testing.T, e env, sizes []int) {
4010	te := newTest(t, e)
4011	te.startServer(&testServer{security: e.security})
4012	defer te.tearDown()
4013	tc := testpb.NewTestServiceClient(te.clientConn())
4014
4015	ctx, cancel := context.WithTimeout(te.ctx, time.Second*30)
4016	defer cancel()
4017	stream, err := tc.StreamingInputCall(ctx)
4018	if err != nil {
4019		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
4020	}
4021
4022	var sum int
4023	for _, s := range sizes {
4024		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s))
4025		if err != nil {
4026			t.Fatal(err)
4027		}
4028
4029		req := &testpb.StreamingInputCallRequest{
4030			Payload: payload,
4031		}
4032		if err := stream.Send(req); err != nil {
4033			t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
4034		}
4035		sum += s
4036	}
4037	reply, err := stream.CloseAndRecv()
4038	if err != nil {
4039		t.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
4040	}
4041	if reply.GetAggregatedPayloadSize() != int32(sum) {
4042		t.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
4043	}
4044}
4045
4046func (s) TestClientStreamingError(t *testing.T) {
4047	for _, e := range listTestEnv() {
4048		if e.name == "handler-tls" {
4049			continue
4050		}
4051		testClientStreamingError(t, e)
4052	}
4053}
4054
4055func testClientStreamingError(t *testing.T, e env) {
4056	te := newTest(t, e)
4057	te.startServer(&testServer{security: e.security, earlyFail: true})
4058	defer te.tearDown()
4059	tc := testpb.NewTestServiceClient(te.clientConn())
4060
4061	stream, err := tc.StreamingInputCall(te.ctx)
4062	if err != nil {
4063		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
4064	}
4065	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1)
4066	if err != nil {
4067		t.Fatal(err)
4068	}
4069
4070	req := &testpb.StreamingInputCallRequest{
4071		Payload: payload,
4072	}
4073	// The 1st request should go through.
4074	if err := stream.Send(req); err != nil {
4075		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
4076	}
4077	for {
4078		if err := stream.Send(req); err != io.EOF {
4079			continue
4080		}
4081		if _, err := stream.CloseAndRecv(); status.Code(err) != codes.NotFound {
4082			t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound)
4083		}
4084		break
4085	}
4086}
4087
4088func (s) TestExceedMaxStreamsLimit(t *testing.T) {
4089	for _, e := range listTestEnv() {
4090		testExceedMaxStreamsLimit(t, e)
4091	}
4092}
4093
4094func testExceedMaxStreamsLimit(t *testing.T, e env) {
4095	te := newTest(t, e)
4096	te.declareLogNoise(
4097		"http2Client.notifyError got notified that the client transport was broken",
4098		"Conn.resetTransport failed to create client transport",
4099		"grpc: the connection is closing",
4100	)
4101	te.maxStream = 1 // Only allows 1 live stream per server transport.
4102	te.startServer(&testServer{security: e.security})
4103	defer te.tearDown()
4104
4105	cc := te.clientConn()
4106	tc := testpb.NewTestServiceClient(cc)
4107
4108	_, err := tc.StreamingInputCall(te.ctx)
4109	if err != nil {
4110		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
4111	}
4112	// Loop until receiving the new max stream setting from the server.
4113	for {
4114		ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
4115		defer cancel()
4116		_, err := tc.StreamingInputCall(ctx)
4117		if err == nil {
4118			time.Sleep(50 * time.Millisecond)
4119			continue
4120		}
4121		if status.Code(err) == codes.DeadlineExceeded {
4122			break
4123		}
4124		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
4125	}
4126}
4127
4128func (s) TestStreamsQuotaRecovery(t *testing.T) {
4129	for _, e := range listTestEnv() {
4130		testStreamsQuotaRecovery(t, e)
4131	}
4132}
4133
4134func testStreamsQuotaRecovery(t *testing.T, e env) {
4135	te := newTest(t, e)
4136	te.declareLogNoise(
4137		"http2Client.notifyError got notified that the client transport was broken",
4138		"Conn.resetTransport failed to create client transport",
4139		"grpc: the connection is closing",
4140	)
4141	te.maxStream = 1 // Allows 1 live stream.
4142	te.startServer(&testServer{security: e.security})
4143	defer te.tearDown()
4144
4145	cc := te.clientConn()
4146	tc := testpb.NewTestServiceClient(cc)
4147	ctx, cancel := context.WithCancel(context.Background())
4148	defer cancel()
4149	if _, err := tc.StreamingInputCall(ctx); err != nil {
4150		t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, <nil>", err)
4151	}
4152	// Loop until the new max stream setting is effective.
4153	for {
4154		ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
4155		_, err := tc.StreamingInputCall(ctx)
4156		cancel()
4157		if err == nil {
4158			time.Sleep(5 * time.Millisecond)
4159			continue
4160		}
4161		if status.Code(err) == codes.DeadlineExceeded {
4162			break
4163		}
4164		t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %s", err, codes.DeadlineExceeded)
4165	}
4166
4167	var wg sync.WaitGroup
4168	for i := 0; i < 10; i++ {
4169		wg.Add(1)
4170		go func() {
4171			defer wg.Done()
4172			payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314)
4173			if err != nil {
4174				t.Error(err)
4175				return
4176			}
4177			req := &testpb.SimpleRequest{
4178				ResponseType: testpb.PayloadType_COMPRESSABLE,
4179				ResponseSize: 1592,
4180				Payload:      payload,
4181			}
4182			// No rpc should go through due to the max streams limit.
4183			ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
4184			defer cancel()
4185			if _, err := tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
4186				t.Errorf("tc.UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
4187			}
4188		}()
4189	}
4190	wg.Wait()
4191
4192	cancel()
4193	// A new stream should be allowed after canceling the first one.
4194	ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
4195	defer cancel()
4196	if _, err := tc.StreamingInputCall(ctx); err != nil {
4197		t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %v", err, nil)
4198	}
4199}
4200
4201func (s) TestCompressServerHasNoSupport(t *testing.T) {
4202	for _, e := range listTestEnv() {
4203		testCompressServerHasNoSupport(t, e)
4204	}
4205}
4206
4207func testCompressServerHasNoSupport(t *testing.T, e env) {
4208	te := newTest(t, e)
4209	te.serverCompression = false
4210	te.clientCompression = false
4211	te.clientNopCompression = true
4212	te.startServer(&testServer{security: e.security})
4213	defer te.tearDown()
4214	tc := testpb.NewTestServiceClient(te.clientConn())
4215
4216	const argSize = 271828
4217	const respSize = 314159
4218	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
4219	if err != nil {
4220		t.Fatal(err)
4221	}
4222	req := &testpb.SimpleRequest{
4223		ResponseType: testpb.PayloadType_COMPRESSABLE,
4224		ResponseSize: respSize,
4225		Payload:      payload,
4226	}
4227	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.Unimplemented {
4228		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %s", err, codes.Unimplemented)
4229	}
4230	// Streaming RPC
4231	stream, err := tc.FullDuplexCall(context.Background())
4232	if err != nil {
4233		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4234	}
4235	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Unimplemented {
4236		t.Fatalf("%v.Recv() = %v, want error code %s", stream, err, codes.Unimplemented)
4237	}
4238}
4239
4240func (s) TestCompressOK(t *testing.T) {
4241	for _, e := range listTestEnv() {
4242		testCompressOK(t, e)
4243	}
4244}
4245
4246func testCompressOK(t *testing.T, e env) {
4247	te := newTest(t, e)
4248	te.serverCompression = true
4249	te.clientCompression = true
4250	te.startServer(&testServer{security: e.security})
4251	defer te.tearDown()
4252	tc := testpb.NewTestServiceClient(te.clientConn())
4253
4254	// Unary call
4255	const argSize = 271828
4256	const respSize = 314159
4257	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
4258	if err != nil {
4259		t.Fatal(err)
4260	}
4261	req := &testpb.SimpleRequest{
4262		ResponseType: testpb.PayloadType_COMPRESSABLE,
4263		ResponseSize: respSize,
4264		Payload:      payload,
4265	}
4266	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something"))
4267	if _, err := tc.UnaryCall(ctx, req); err != nil {
4268		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
4269	}
4270	// Streaming RPC
4271	ctx, cancel := context.WithCancel(context.Background())
4272	defer cancel()
4273	stream, err := tc.FullDuplexCall(ctx)
4274	if err != nil {
4275		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4276	}
4277	respParam := []*testpb.ResponseParameters{
4278		{
4279			Size: 31415,
4280		},
4281	}
4282	payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
4283	if err != nil {
4284		t.Fatal(err)
4285	}
4286	sreq := &testpb.StreamingOutputCallRequest{
4287		ResponseType:       testpb.PayloadType_COMPRESSABLE,
4288		ResponseParameters: respParam,
4289		Payload:            payload,
4290	}
4291	if err := stream.Send(sreq); err != nil {
4292		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
4293	}
4294	stream.CloseSend()
4295	if _, err := stream.Recv(); err != nil {
4296		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
4297	}
4298	if _, err := stream.Recv(); err != io.EOF {
4299		t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err)
4300	}
4301}
4302
4303func (s) TestIdentityEncoding(t *testing.T) {
4304	for _, e := range listTestEnv() {
4305		testIdentityEncoding(t, e)
4306	}
4307}
4308
4309func testIdentityEncoding(t *testing.T, e env) {
4310	te := newTest(t, e)
4311	te.startServer(&testServer{security: e.security})
4312	defer te.tearDown()
4313	tc := testpb.NewTestServiceClient(te.clientConn())
4314
4315	// Unary call
4316	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 5)
4317	if err != nil {
4318		t.Fatal(err)
4319	}
4320	req := &testpb.SimpleRequest{
4321		ResponseType: testpb.PayloadType_COMPRESSABLE,
4322		ResponseSize: 10,
4323		Payload:      payload,
4324	}
4325	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something"))
4326	if _, err := tc.UnaryCall(ctx, req); err != nil {
4327		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
4328	}
4329	// Streaming RPC
4330	ctx, cancel := context.WithCancel(context.Background())
4331	defer cancel()
4332	stream, err := tc.FullDuplexCall(ctx, grpc.UseCompressor("identity"))
4333	if err != nil {
4334		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4335	}
4336	payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
4337	if err != nil {
4338		t.Fatal(err)
4339	}
4340	sreq := &testpb.StreamingOutputCallRequest{
4341		ResponseType:       testpb.PayloadType_COMPRESSABLE,
4342		ResponseParameters: []*testpb.ResponseParameters{{Size: 10}},
4343		Payload:            payload,
4344	}
4345	if err := stream.Send(sreq); err != nil {
4346		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
4347	}
4348	stream.CloseSend()
4349	if _, err := stream.Recv(); err != nil {
4350		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
4351	}
4352	if _, err := stream.Recv(); err != io.EOF {
4353		t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err)
4354	}
4355}
4356
4357func (s) TestUnaryClientInterceptor(t *testing.T) {
4358	for _, e := range listTestEnv() {
4359		testUnaryClientInterceptor(t, e)
4360	}
4361}
4362
4363func failOkayRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
4364	err := invoker(ctx, method, req, reply, cc, opts...)
4365	if err == nil {
4366		return status.Error(codes.NotFound, "")
4367	}
4368	return err
4369}
4370
4371func testUnaryClientInterceptor(t *testing.T, e env) {
4372	te := newTest(t, e)
4373	te.userAgent = testAppUA
4374	te.unaryClientInt = failOkayRPC
4375	te.startServer(&testServer{security: e.security})
4376	defer te.tearDown()
4377
4378	tc := testpb.NewTestServiceClient(te.clientConn())
4379	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.NotFound {
4380		t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.NotFound)
4381	}
4382}
4383
4384func (s) TestStreamClientInterceptor(t *testing.T) {
4385	for _, e := range listTestEnv() {
4386		testStreamClientInterceptor(t, e)
4387	}
4388}
4389
4390func failOkayStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
4391	s, err := streamer(ctx, desc, cc, method, opts...)
4392	if err == nil {
4393		return nil, status.Error(codes.NotFound, "")
4394	}
4395	return s, nil
4396}
4397
4398func testStreamClientInterceptor(t *testing.T, e env) {
4399	te := newTest(t, e)
4400	te.streamClientInt = failOkayStream
4401	te.startServer(&testServer{security: e.security})
4402	defer te.tearDown()
4403
4404	tc := testpb.NewTestServiceClient(te.clientConn())
4405	respParam := []*testpb.ResponseParameters{
4406		{
4407			Size: int32(1),
4408		},
4409	}
4410	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
4411	if err != nil {
4412		t.Fatal(err)
4413	}
4414	req := &testpb.StreamingOutputCallRequest{
4415		ResponseType:       testpb.PayloadType_COMPRESSABLE,
4416		ResponseParameters: respParam,
4417		Payload:            payload,
4418	}
4419	if _, err := tc.StreamingOutputCall(context.Background(), req); status.Code(err) != codes.NotFound {
4420		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, error code %s", tc, err, codes.NotFound)
4421	}
4422}
4423
4424func (s) TestUnaryServerInterceptor(t *testing.T) {
4425	for _, e := range listTestEnv() {
4426		testUnaryServerInterceptor(t, e)
4427	}
4428}
4429
4430func errInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
4431	return nil, status.Error(codes.PermissionDenied, "")
4432}
4433
4434func testUnaryServerInterceptor(t *testing.T, e env) {
4435	te := newTest(t, e)
4436	te.unaryServerInt = errInjector
4437	te.startServer(&testServer{security: e.security})
4438	defer te.tearDown()
4439
4440	tc := testpb.NewTestServiceClient(te.clientConn())
4441	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.PermissionDenied {
4442		t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
4443	}
4444}
4445
4446func (s) TestStreamServerInterceptor(t *testing.T) {
4447	for _, e := range listTestEnv() {
4448		// TODO(bradfitz): Temporarily skip this env due to #619.
4449		if e.name == "handler-tls" {
4450			continue
4451		}
4452		testStreamServerInterceptor(t, e)
4453	}
4454}
4455
4456func fullDuplexOnly(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
4457	if info.FullMethod == "/grpc.testing.TestService/FullDuplexCall" {
4458		return handler(srv, ss)
4459	}
4460	// Reject the other methods.
4461	return status.Error(codes.PermissionDenied, "")
4462}
4463
4464func testStreamServerInterceptor(t *testing.T, e env) {
4465	te := newTest(t, e)
4466	te.streamServerInt = fullDuplexOnly
4467	te.startServer(&testServer{security: e.security})
4468	defer te.tearDown()
4469
4470	tc := testpb.NewTestServiceClient(te.clientConn())
4471	respParam := []*testpb.ResponseParameters{
4472		{
4473			Size: int32(1),
4474		},
4475	}
4476	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
4477	if err != nil {
4478		t.Fatal(err)
4479	}
4480	req := &testpb.StreamingOutputCallRequest{
4481		ResponseType:       testpb.PayloadType_COMPRESSABLE,
4482		ResponseParameters: respParam,
4483		Payload:            payload,
4484	}
4485	s1, err := tc.StreamingOutputCall(context.Background(), req)
4486	if err != nil {
4487		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, <nil>", tc, err)
4488	}
4489	if _, err := s1.Recv(); status.Code(err) != codes.PermissionDenied {
4490		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
4491	}
4492	s2, err := tc.FullDuplexCall(context.Background())
4493	if err != nil {
4494		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4495	}
4496	if err := s2.Send(req); err != nil {
4497		t.Fatalf("%v.Send(_) = %v, want <nil>", s2, err)
4498	}
4499	if _, err := s2.Recv(); err != nil {
4500		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", s2, err)
4501	}
4502}
4503
4504// funcServer implements methods of TestServiceServer using funcs,
4505// similar to an http.HandlerFunc.
4506// Any unimplemented method will crash. Tests implement the method(s)
4507// they need.
4508type funcServer struct {
4509	testpb.TestServiceServer
4510	unaryCall          func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
4511	streamingInputCall func(stream testpb.TestService_StreamingInputCallServer) error
4512	fullDuplexCall     func(stream testpb.TestService_FullDuplexCallServer) error
4513}
4514
4515func (s *funcServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4516	return s.unaryCall(ctx, in)
4517}
4518
4519func (s *funcServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
4520	return s.streamingInputCall(stream)
4521}
4522
4523func (s *funcServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
4524	return s.fullDuplexCall(stream)
4525}
4526
4527func (s) TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) {
4528	for _, e := range listTestEnv() {
4529		testClientRequestBodyErrorUnexpectedEOF(t, e)
4530	}
4531}
4532
4533func testClientRequestBodyErrorUnexpectedEOF(t *testing.T, e env) {
4534	te := newTest(t, e)
4535	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4536		errUnexpectedCall := errors.New("unexpected call func server method")
4537		t.Error(errUnexpectedCall)
4538		return nil, errUnexpectedCall
4539	}}
4540	te.startServer(ts)
4541	defer te.tearDown()
4542	te.withServerTester(func(st *serverTester) {
4543		st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4544		// Say we have 5 bytes coming, but set END_STREAM flag:
4545		st.writeData(1, true, []byte{0, 0, 0, 0, 5})
4546		st.wantAnyFrame() // wait for server to crash (it used to crash)
4547	})
4548}
4549
4550func (s) TestClientRequestBodyErrorCloseAfterLength(t *testing.T) {
4551	for _, e := range listTestEnv() {
4552		testClientRequestBodyErrorCloseAfterLength(t, e)
4553	}
4554}
4555
4556func testClientRequestBodyErrorCloseAfterLength(t *testing.T, e env) {
4557	te := newTest(t, e)
4558	te.declareLogNoise("Server.processUnaryRPC failed to write status")
4559	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4560		errUnexpectedCall := errors.New("unexpected call func server method")
4561		t.Error(errUnexpectedCall)
4562		return nil, errUnexpectedCall
4563	}}
4564	te.startServer(ts)
4565	defer te.tearDown()
4566	te.withServerTester(func(st *serverTester) {
4567		st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4568		// say we're sending 5 bytes, but then close the connection instead.
4569		st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4570		st.cc.Close()
4571	})
4572}
4573
4574func (s) TestClientRequestBodyErrorCancel(t *testing.T) {
4575	for _, e := range listTestEnv() {
4576		testClientRequestBodyErrorCancel(t, e)
4577	}
4578}
4579
4580func testClientRequestBodyErrorCancel(t *testing.T, e env) {
4581	te := newTest(t, e)
4582	gotCall := make(chan bool, 1)
4583	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4584		gotCall <- true
4585		return new(testpb.SimpleResponse), nil
4586	}}
4587	te.startServer(ts)
4588	defer te.tearDown()
4589	te.withServerTester(func(st *serverTester) {
4590		st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4591		// Say we have 5 bytes coming, but cancel it instead.
4592		st.writeRSTStream(1, http2.ErrCodeCancel)
4593		st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4594
4595		// Verify we didn't a call yet.
4596		select {
4597		case <-gotCall:
4598			t.Fatal("unexpected call")
4599		default:
4600		}
4601
4602		// And now send an uncanceled (but still invalid), just to get a response.
4603		st.writeHeadersGRPC(3, "/grpc.testing.TestService/UnaryCall")
4604		st.writeData(3, true, []byte{0, 0, 0, 0, 0})
4605		<-gotCall
4606		st.wantAnyFrame()
4607	})
4608}
4609
4610func (s) TestClientRequestBodyErrorCancelStreamingInput(t *testing.T) {
4611	for _, e := range listTestEnv() {
4612		testClientRequestBodyErrorCancelStreamingInput(t, e)
4613	}
4614}
4615
4616func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) {
4617	te := newTest(t, e)
4618	recvErr := make(chan error, 1)
4619	ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
4620		_, err := stream.Recv()
4621		recvErr <- err
4622		return nil
4623	}}
4624	te.startServer(ts)
4625	defer te.tearDown()
4626	te.withServerTester(func(st *serverTester) {
4627		st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall")
4628		// Say we have 5 bytes coming, but cancel it instead.
4629		st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4630		st.writeRSTStream(1, http2.ErrCodeCancel)
4631
4632		var got error
4633		select {
4634		case got = <-recvErr:
4635		case <-time.After(3 * time.Second):
4636			t.Fatal("timeout waiting for error")
4637		}
4638		if grpc.Code(got) != codes.Canceled {
4639			t.Errorf("error = %#v; want error code %s", got, codes.Canceled)
4640		}
4641	})
4642}
4643
4644func (s) TestClientResourceExhaustedCancelFullDuplex(t *testing.T) {
4645	for _, e := range listTestEnv() {
4646		if e.httpHandler {
4647			// httpHandler write won't be blocked on flow control window.
4648			continue
4649		}
4650		testClientResourceExhaustedCancelFullDuplex(t, e)
4651	}
4652}
4653
4654func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) {
4655	te := newTest(t, e)
4656	recvErr := make(chan error, 1)
4657	ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
4658		defer close(recvErr)
4659		_, err := stream.Recv()
4660		if err != nil {
4661			return status.Errorf(codes.Internal, "stream.Recv() got error: %v, want <nil>", err)
4662		}
4663		// create a payload that's larger than the default flow control window.
4664		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10)
4665		if err != nil {
4666			return err
4667		}
4668		resp := &testpb.StreamingOutputCallResponse{
4669			Payload: payload,
4670		}
4671		ce := make(chan error, 1)
4672		go func() {
4673			var err error
4674			for {
4675				if err = stream.Send(resp); err != nil {
4676					break
4677				}
4678			}
4679			ce <- err
4680		}()
4681		select {
4682		case err = <-ce:
4683		case <-time.After(10 * time.Second):
4684			err = errors.New("10s timeout reached")
4685		}
4686		recvErr <- err
4687		return err
4688	}}
4689	te.startServer(ts)
4690	defer te.tearDown()
4691	// set a low limit on receive message size to error with Resource Exhausted on
4692	// client side when server send a large message.
4693	te.maxClientReceiveMsgSize = newInt(10)
4694	cc := te.clientConn()
4695	tc := testpb.NewTestServiceClient(cc)
4696	stream, err := tc.FullDuplexCall(context.Background())
4697	if err != nil {
4698		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4699	}
4700	req := &testpb.StreamingOutputCallRequest{}
4701	if err := stream.Send(req); err != nil {
4702		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
4703	}
4704	if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted {
4705		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
4706	}
4707	err = <-recvErr
4708	if status.Code(err) != codes.Canceled {
4709		t.Fatalf("server got error %v, want error code: %s", err, codes.Canceled)
4710	}
4711}
4712
4713type clientFailCreds struct{}
4714
4715func (c *clientFailCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4716	return rawConn, nil, nil
4717}
4718func (c *clientFailCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4719	return nil, nil, fmt.Errorf("client handshake fails with fatal error")
4720}
4721func (c *clientFailCreds) Info() credentials.ProtocolInfo {
4722	return credentials.ProtocolInfo{}
4723}
4724func (c *clientFailCreds) Clone() credentials.TransportCredentials {
4725	return c
4726}
4727func (c *clientFailCreds) OverrideServerName(s string) error {
4728	return nil
4729}
4730
4731// This test makes sure that failfast RPCs fail if client handshake fails with
4732// fatal errors.
4733func (s) TestFailfastRPCFailOnFatalHandshakeError(t *testing.T) {
4734	lis, err := net.Listen("tcp", "localhost:0")
4735	if err != nil {
4736		t.Fatalf("Failed to listen: %v", err)
4737	}
4738	defer lis.Close()
4739
4740	cc, err := grpc.Dial("passthrough:///"+lis.Addr().String(), grpc.WithTransportCredentials(&clientFailCreds{}))
4741	if err != nil {
4742		t.Fatalf("grpc.Dial(_) = %v", err)
4743	}
4744	defer cc.Close()
4745
4746	tc := testpb.NewTestServiceClient(cc)
4747	// This unary call should fail, but not timeout.
4748	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
4749	defer cancel()
4750	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(false)); status.Code(err) != codes.Unavailable {
4751		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <Unavailable>", err)
4752	}
4753}
4754
4755func (s) TestFlowControlLogicalRace(t *testing.T) {
4756	// Test for a regression of https://github.com/grpc/grpc-go/issues/632,
4757	// and other flow control bugs.
4758
4759	const (
4760		itemCount   = 100
4761		itemSize    = 1 << 10
4762		recvCount   = 2
4763		maxFailures = 3
4764
4765		requestTimeout = time.Second * 5
4766	)
4767
4768	requestCount := 10000
4769	if raceMode {
4770		requestCount = 1000
4771	}
4772
4773	lis, err := net.Listen("tcp", "localhost:0")
4774	if err != nil {
4775		t.Fatalf("Failed to listen: %v", err)
4776	}
4777	defer lis.Close()
4778
4779	s := grpc.NewServer()
4780	testpb.RegisterTestServiceServer(s, &flowControlLogicalRaceServer{
4781		itemCount: itemCount,
4782		itemSize:  itemSize,
4783	})
4784	defer s.Stop()
4785
4786	go s.Serve(lis)
4787
4788	ctx := context.Background()
4789
4790	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
4791	if err != nil {
4792		t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4793	}
4794	defer cc.Close()
4795	cl := testpb.NewTestServiceClient(cc)
4796
4797	failures := 0
4798	for i := 0; i < requestCount; i++ {
4799		ctx, cancel := context.WithTimeout(ctx, requestTimeout)
4800		output, err := cl.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
4801		if err != nil {
4802			t.Fatalf("StreamingOutputCall; err = %q", err)
4803		}
4804
4805		j := 0
4806	loop:
4807		for ; j < recvCount; j++ {
4808			_, err := output.Recv()
4809			if err != nil {
4810				if err == io.EOF {
4811					break loop
4812				}
4813				switch status.Code(err) {
4814				case codes.DeadlineExceeded:
4815					break loop
4816				default:
4817					t.Fatalf("Recv; err = %q", err)
4818				}
4819			}
4820		}
4821		cancel()
4822		<-ctx.Done()
4823
4824		if j < recvCount {
4825			t.Errorf("got %d responses to request %d", j, i)
4826			failures++
4827			if failures >= maxFailures {
4828				// Continue past the first failure to see if the connection is
4829				// entirely broken, or if only a single RPC was affected
4830				break
4831			}
4832		}
4833	}
4834}
4835
4836type flowControlLogicalRaceServer struct {
4837	testpb.TestServiceServer
4838
4839	itemSize  int
4840	itemCount int
4841}
4842
4843func (s *flowControlLogicalRaceServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, srv testpb.TestService_StreamingOutputCallServer) error {
4844	for i := 0; i < s.itemCount; i++ {
4845		err := srv.Send(&testpb.StreamingOutputCallResponse{
4846			Payload: &testpb.Payload{
4847				// Sending a large stream of data which the client reject
4848				// helps to trigger some types of flow control bugs.
4849				//
4850				// Reallocating memory here is inefficient, but the stress it
4851				// puts on the GC leads to more frequent flow control
4852				// failures. The GC likely causes more variety in the
4853				// goroutine scheduling orders.
4854				Body: bytes.Repeat([]byte("a"), s.itemSize),
4855			},
4856		})
4857		if err != nil {
4858			return err
4859		}
4860	}
4861	return nil
4862}
4863
4864type lockingWriter struct {
4865	mu sync.Mutex
4866	w  io.Writer
4867}
4868
4869func (lw *lockingWriter) Write(p []byte) (n int, err error) {
4870	lw.mu.Lock()
4871	defer lw.mu.Unlock()
4872	return lw.w.Write(p)
4873}
4874
4875func (lw *lockingWriter) setWriter(w io.Writer) {
4876	lw.mu.Lock()
4877	defer lw.mu.Unlock()
4878	lw.w = w
4879}
4880
4881var testLogOutput = &lockingWriter{w: os.Stderr}
4882
4883// awaitNewConnLogOutput waits for any of grpc.NewConn's goroutines to
4884// terminate, if they're still running. It spams logs with this
4885// message.  We wait for it so our log filter is still
4886// active. Otherwise the "defer restore()" at the top of various test
4887// functions restores our log filter and then the goroutine spams.
4888func awaitNewConnLogOutput() {
4889	awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry")
4890}
4891
4892func awaitLogOutput(maxWait time.Duration, phrase string) {
4893	pb := []byte(phrase)
4894
4895	timer := time.NewTimer(maxWait)
4896	defer timer.Stop()
4897	wakeup := make(chan bool, 1)
4898	for {
4899		if logOutputHasContents(pb, wakeup) {
4900			return
4901		}
4902		select {
4903		case <-timer.C:
4904			// Too slow. Oh well.
4905			return
4906		case <-wakeup:
4907		}
4908	}
4909}
4910
4911func logOutputHasContents(v []byte, wakeup chan<- bool) bool {
4912	testLogOutput.mu.Lock()
4913	defer testLogOutput.mu.Unlock()
4914	fw, ok := testLogOutput.w.(*filterWriter)
4915	if !ok {
4916		return false
4917	}
4918	fw.mu.Lock()
4919	defer fw.mu.Unlock()
4920	if bytes.Contains(fw.buf.Bytes(), v) {
4921		return true
4922	}
4923	fw.wakeup = wakeup
4924	return false
4925}
4926
4927var verboseLogs = flag.Bool("verbose_logs", false, "show all grpclog output, without filtering")
4928
4929func noop() {}
4930
4931// declareLogNoise declares that t is expected to emit the following noisy phrases,
4932// even on success. Those phrases will be filtered from grpclog output
4933// and only be shown if *verbose_logs or t ends up failing.
4934// The returned restore function should be called with defer to be run
4935// before the test ends.
4936func declareLogNoise(t *testing.T, phrases ...string) (restore func()) {
4937	if *verboseLogs {
4938		return noop
4939	}
4940	fw := &filterWriter{dst: os.Stderr, filter: phrases}
4941	testLogOutput.setWriter(fw)
4942	return func() {
4943		if t.Failed() {
4944			fw.mu.Lock()
4945			defer fw.mu.Unlock()
4946			if fw.buf.Len() > 0 {
4947				t.Logf("Complete log output:\n%s", fw.buf.Bytes())
4948			}
4949		}
4950		testLogOutput.setWriter(os.Stderr)
4951	}
4952}
4953
4954type filterWriter struct {
4955	dst    io.Writer
4956	filter []string
4957
4958	mu     sync.Mutex
4959	buf    bytes.Buffer
4960	wakeup chan<- bool // if non-nil, gets true on write
4961}
4962
4963func (fw *filterWriter) Write(p []byte) (n int, err error) {
4964	fw.mu.Lock()
4965	fw.buf.Write(p)
4966	if fw.wakeup != nil {
4967		select {
4968		case fw.wakeup <- true:
4969		default:
4970		}
4971	}
4972	fw.mu.Unlock()
4973
4974	ps := string(p)
4975	for _, f := range fw.filter {
4976		if strings.Contains(ps, f) {
4977			return len(p), nil
4978		}
4979	}
4980	return fw.dst.Write(p)
4981}
4982
4983// stubServer is a server that is easy to customize within individual test
4984// cases.
4985type stubServer struct {
4986	// Guarantees we satisfy this interface; panics if unimplemented methods are called.
4987	testpb.TestServiceServer
4988
4989	// Customizable implementations of server handlers.
4990	emptyCall      func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
4991	unaryCall      func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
4992	fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error
4993
4994	// A client connected to this service the test may use.  Created in Start().
4995	client testpb.TestServiceClient
4996	cc     *grpc.ClientConn
4997	s      *grpc.Server
4998
4999	addr string // address of listener
5000
5001	cleanups []func() // Lambdas executed in Stop(); populated by Start().
5002
5003	r *manual.Resolver
5004}
5005
5006func (ss *stubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5007	return ss.emptyCall(ctx, in)
5008}
5009
5010func (ss *stubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
5011	return ss.unaryCall(ctx, in)
5012}
5013
5014func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
5015	return ss.fullDuplexCall(stream)
5016}
5017
5018// Start starts the server and creates a client connected to it.
5019func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error {
5020	r, cleanup := manual.GenerateAndRegisterManualResolver()
5021	ss.r = r
5022	ss.cleanups = append(ss.cleanups, cleanup)
5023
5024	lis, err := net.Listen("tcp", "localhost:0")
5025	if err != nil {
5026		return fmt.Errorf(`net.Listen("tcp", "localhost:0") = %v`, err)
5027	}
5028	ss.addr = lis.Addr().String()
5029	ss.cleanups = append(ss.cleanups, func() { lis.Close() })
5030
5031	s := grpc.NewServer(sopts...)
5032	testpb.RegisterTestServiceServer(s, ss)
5033	go s.Serve(lis)
5034	ss.cleanups = append(ss.cleanups, s.Stop)
5035	ss.s = s
5036
5037	target := ss.r.Scheme() + ":///" + ss.addr
5038
5039	opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...)
5040	cc, err := grpc.Dial(target, opts...)
5041	if err != nil {
5042		return fmt.Errorf("grpc.Dial(%q) = %v", target, err)
5043	}
5044	ss.cc = cc
5045	ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}})
5046	if err := ss.waitForReady(cc); err != nil {
5047		return err
5048	}
5049
5050	ss.cleanups = append(ss.cleanups, func() { cc.Close() })
5051
5052	ss.client = testpb.NewTestServiceClient(cc)
5053	return nil
5054}
5055
5056func (ss *stubServer) newServiceConfig(sc string) {
5057	ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}, ServiceConfig: parseCfg(ss.r, sc)})
5058}
5059
5060func (ss *stubServer) waitForReady(cc *grpc.ClientConn) error {
5061	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
5062	defer cancel()
5063	for {
5064		s := cc.GetState()
5065		if s == connectivity.Ready {
5066			return nil
5067		}
5068		if !cc.WaitForStateChange(ctx, s) {
5069			// ctx got timeout or canceled.
5070			return ctx.Err()
5071		}
5072	}
5073}
5074
5075func (ss *stubServer) Stop() {
5076	for i := len(ss.cleanups) - 1; i >= 0; i-- {
5077		ss.cleanups[i]()
5078	}
5079}
5080
5081func (s) TestGRPCMethod(t *testing.T) {
5082	var method string
5083	var ok bool
5084
5085	ss := &stubServer{
5086		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5087			method, ok = grpc.Method(ctx)
5088			return &testpb.Empty{}, nil
5089		},
5090	}
5091	if err := ss.Start(nil); err != nil {
5092		t.Fatalf("Error starting endpoint server: %v", err)
5093	}
5094	defer ss.Stop()
5095
5096	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5097	defer cancel()
5098
5099	if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
5100		t.Fatalf("ss.client.EmptyCall(_, _) = _, %v; want _, nil", err)
5101	}
5102
5103	if want := "/grpc.testing.TestService/EmptyCall"; !ok || method != want {
5104		t.Fatalf("grpc.Method(_) = %q, %v; want %q, true", method, ok, want)
5105	}
5106}
5107
5108func (s) TestUnaryProxyDoesNotForwardMetadata(t *testing.T) {
5109	const mdkey = "somedata"
5110
5111	// endpoint ensures mdkey is NOT in metadata and returns an error if it is.
5112	endpoint := &stubServer{
5113		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5114			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
5115				return nil, status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
5116			}
5117			return &testpb.Empty{}, nil
5118		},
5119	}
5120	if err := endpoint.Start(nil); err != nil {
5121		t.Fatalf("Error starting endpoint server: %v", err)
5122	}
5123	defer endpoint.Stop()
5124
5125	// proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint
5126	// without explicitly copying the metadata.
5127	proxy := &stubServer{
5128		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5129			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
5130				return nil, status.Errorf(codes.Internal, "proxy: md=%v; want contains(%q)", md, mdkey)
5131			}
5132			return endpoint.client.EmptyCall(ctx, in)
5133		},
5134	}
5135	if err := proxy.Start(nil); err != nil {
5136		t.Fatalf("Error starting proxy server: %v", err)
5137	}
5138	defer proxy.Stop()
5139
5140	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5141	defer cancel()
5142	md := metadata.Pairs(mdkey, "val")
5143	ctx = metadata.NewOutgoingContext(ctx, md)
5144
5145	// Sanity check that endpoint properly errors when it sees mdkey.
5146	_, err := endpoint.client.EmptyCall(ctx, &testpb.Empty{})
5147	if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
5148		t.Fatalf("endpoint.client.EmptyCall(_, _) = _, %v; want _, <status with Code()=Internal>", err)
5149	}
5150
5151	if _, err := proxy.client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
5152		t.Fatal(err.Error())
5153	}
5154}
5155
5156func (s) TestStreamingProxyDoesNotForwardMetadata(t *testing.T) {
5157	const mdkey = "somedata"
5158
5159	// doFDC performs a FullDuplexCall with client and returns the error from the
5160	// first stream.Recv call, or nil if that error is io.EOF.  Calls t.Fatal if
5161	// the stream cannot be established.
5162	doFDC := func(ctx context.Context, client testpb.TestServiceClient) error {
5163		stream, err := client.FullDuplexCall(ctx)
5164		if err != nil {
5165			t.Fatalf("Unwanted error: %v", err)
5166		}
5167		if _, err := stream.Recv(); err != io.EOF {
5168			return err
5169		}
5170		return nil
5171	}
5172
5173	// endpoint ensures mdkey is NOT in metadata and returns an error if it is.
5174	endpoint := &stubServer{
5175		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
5176			ctx := stream.Context()
5177			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
5178				return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
5179			}
5180			return nil
5181		},
5182	}
5183	if err := endpoint.Start(nil); err != nil {
5184		t.Fatalf("Error starting endpoint server: %v", err)
5185	}
5186	defer endpoint.Stop()
5187
5188	// proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint
5189	// without explicitly copying the metadata.
5190	proxy := &stubServer{
5191		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
5192			ctx := stream.Context()
5193			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
5194				return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
5195			}
5196			return doFDC(ctx, endpoint.client)
5197		},
5198	}
5199	if err := proxy.Start(nil); err != nil {
5200		t.Fatalf("Error starting proxy server: %v", err)
5201	}
5202	defer proxy.Stop()
5203
5204	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5205	defer cancel()
5206	md := metadata.Pairs(mdkey, "val")
5207	ctx = metadata.NewOutgoingContext(ctx, md)
5208
5209	// Sanity check that endpoint properly errors when it sees mdkey in ctx.
5210	err := doFDC(ctx, endpoint.client)
5211	if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
5212		t.Fatalf("stream.Recv() = _, %v; want _, <status with Code()=Internal>", err)
5213	}
5214
5215	if err := doFDC(ctx, proxy.client); err != nil {
5216		t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err)
5217	}
5218}
5219
5220func (s) TestStatsTagsAndTrace(t *testing.T) {
5221	// Data added to context by client (typically in a stats handler).
5222	tags := []byte{1, 5, 2, 4, 3}
5223	trace := []byte{5, 2, 1, 3, 4}
5224
5225	// endpoint ensures Tags() and Trace() in context match those that were added
5226	// by the client and returns an error if not.
5227	endpoint := &stubServer{
5228		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5229			md, _ := metadata.FromIncomingContext(ctx)
5230			if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) {
5231				return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags)
5232			}
5233			if !reflect.DeepEqual(md["grpc-tags-bin"], []string{string(tags)}) {
5234				return nil, status.Errorf(codes.Internal, "md['grpc-tags-bin']=%v; want %v", md["grpc-tags-bin"], tags)
5235			}
5236			if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, trace) {
5237				return nil, status.Errorf(codes.Internal, "stats.Trace(%v)=%v; want %v", ctx, tr, trace)
5238			}
5239			if !reflect.DeepEqual(md["grpc-trace-bin"], []string{string(trace)}) {
5240				return nil, status.Errorf(codes.Internal, "md['grpc-trace-bin']=%v; want %v", md["grpc-trace-bin"], trace)
5241			}
5242			return &testpb.Empty{}, nil
5243		},
5244	}
5245	if err := endpoint.Start(nil); err != nil {
5246		t.Fatalf("Error starting endpoint server: %v", err)
5247	}
5248	defer endpoint.Stop()
5249
5250	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5251	defer cancel()
5252
5253	testCases := []struct {
5254		ctx  context.Context
5255		want codes.Code
5256	}{
5257		{ctx: ctx, want: codes.Internal},
5258		{ctx: stats.SetTags(ctx, tags), want: codes.Internal},
5259		{ctx: stats.SetTrace(ctx, trace), want: codes.Internal},
5260		{ctx: stats.SetTags(stats.SetTrace(ctx, tags), tags), want: codes.Internal},
5261		{ctx: stats.SetTags(stats.SetTrace(ctx, trace), tags), want: codes.OK},
5262	}
5263
5264	for _, tc := range testCases {
5265		_, err := endpoint.client.EmptyCall(tc.ctx, &testpb.Empty{})
5266		if tc.want == codes.OK && err != nil {
5267			t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err)
5268		}
5269		if s, ok := status.FromError(err); !ok || s.Code() != tc.want {
5270			t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, <status with Code()=%v>", tc.ctx, err, tc.want)
5271		}
5272	}
5273}
5274
5275func (s) TestTapTimeout(t *testing.T) {
5276	sopts := []grpc.ServerOption{
5277		grpc.InTapHandle(func(ctx context.Context, _ *tap.Info) (context.Context, error) {
5278			c, cancel := context.WithCancel(ctx)
5279			// Call cancel instead of setting a deadline so we can detect which error
5280			// occurred -- this cancellation (desired) or the client's deadline
5281			// expired (indicating this cancellation did not affect the RPC).
5282			time.AfterFunc(10*time.Millisecond, cancel)
5283			return c, nil
5284		}),
5285	}
5286
5287	ss := &stubServer{
5288		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5289			<-ctx.Done()
5290			return nil, status.Errorf(codes.Canceled, ctx.Err().Error())
5291		},
5292	}
5293	if err := ss.Start(sopts); err != nil {
5294		t.Fatalf("Error starting endpoint server: %v", err)
5295	}
5296	defer ss.Stop()
5297
5298	// This was known to be flaky; test several times.
5299	for i := 0; i < 10; i++ {
5300		// Set our own deadline in case the server hangs.
5301		ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5302		res, err := ss.client.EmptyCall(ctx, &testpb.Empty{})
5303		cancel()
5304		if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled {
5305			t.Fatalf("ss.client.EmptyCall(context.Background(), _) = %v, %v; want nil, <status with Code()=Canceled>", res, err)
5306		}
5307	}
5308
5309}
5310
5311func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) {
5312	ss := &stubServer{
5313		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
5314			return status.Errorf(codes.Internal, "")
5315		},
5316	}
5317	sopts := []grpc.ServerOption{}
5318	if err := ss.Start(sopts); err != nil {
5319		t.Fatalf("Error starting endpoint server: %v", err)
5320	}
5321	defer ss.Stop()
5322	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
5323	defer cancel()
5324	stream, err := ss.client.FullDuplexCall(ctx)
5325	if err != nil {
5326		t.Fatalf("Error while creating stream: %v", err)
5327	}
5328	for {
5329		if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err == nil {
5330			time.Sleep(5 * time.Millisecond)
5331		} else if err == io.EOF {
5332			break // Success.
5333		} else {
5334			t.Fatalf("stream.Send(_) = %v, want io.EOF", err)
5335		}
5336	}
5337}
5338
5339type windowSizeConfig struct {
5340	serverStream int32
5341	serverConn   int32
5342	clientStream int32
5343	clientConn   int32
5344}
5345
5346func max(a, b int32) int32 {
5347	if a > b {
5348		return a
5349	}
5350	return b
5351}
5352
5353func (s) TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
5354	wc := windowSizeConfig{
5355		serverStream: 8 * 1024 * 1024,
5356		serverConn:   12 * 1024 * 1024,
5357		clientStream: 6 * 1024 * 1024,
5358		clientConn:   8 * 1024 * 1024,
5359	}
5360	for _, e := range listTestEnv() {
5361		testConfigurableWindowSize(t, e, wc)
5362	}
5363}
5364
5365func (s) TestConfigurableWindowSizeWithSmallWindow(t *testing.T) {
5366	wc := windowSizeConfig{
5367		serverStream: 1,
5368		serverConn:   1,
5369		clientStream: 1,
5370		clientConn:   1,
5371	}
5372	for _, e := range listTestEnv() {
5373		testConfigurableWindowSize(t, e, wc)
5374	}
5375}
5376
5377func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
5378	te := newTest(t, e)
5379	te.serverInitialWindowSize = wc.serverStream
5380	te.serverInitialConnWindowSize = wc.serverConn
5381	te.clientInitialWindowSize = wc.clientStream
5382	te.clientInitialConnWindowSize = wc.clientConn
5383
5384	te.startServer(&testServer{security: e.security})
5385	defer te.tearDown()
5386
5387	cc := te.clientConn()
5388	tc := testpb.NewTestServiceClient(cc)
5389	stream, err := tc.FullDuplexCall(context.Background())
5390	if err != nil {
5391		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5392	}
5393	numOfIter := 11
5394	// Set message size to exhaust largest of window sizes.
5395	messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1)
5396	messageSize = max(messageSize, 64*1024)
5397	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize)
5398	if err != nil {
5399		t.Fatal(err)
5400	}
5401	respParams := []*testpb.ResponseParameters{
5402		{
5403			Size: messageSize,
5404		},
5405	}
5406	req := &testpb.StreamingOutputCallRequest{
5407		ResponseType:       testpb.PayloadType_COMPRESSABLE,
5408		ResponseParameters: respParams,
5409		Payload:            payload,
5410	}
5411	for i := 0; i < numOfIter; i++ {
5412		if err := stream.Send(req); err != nil {
5413			t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
5414		}
5415		if _, err := stream.Recv(); err != nil {
5416			t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
5417		}
5418	}
5419	if err := stream.CloseSend(); err != nil {
5420		t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
5421	}
5422}
5423
5424func (s) TestWaitForReadyConnection(t *testing.T) {
5425	for _, e := range listTestEnv() {
5426		testWaitForReadyConnection(t, e)
5427	}
5428
5429}
5430
5431func testWaitForReadyConnection(t *testing.T, e env) {
5432	te := newTest(t, e)
5433	te.userAgent = testAppUA
5434	te.startServer(&testServer{security: e.security})
5435	defer te.tearDown()
5436
5437	cc := te.clientConn() // Non-blocking dial.
5438	tc := testpb.NewTestServiceClient(cc)
5439	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
5440	defer cancel()
5441	state := cc.GetState()
5442	// Wait for connection to be Ready.
5443	for ; state != connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
5444	}
5445	if state != connectivity.Ready {
5446		t.Fatalf("Want connection state to be Ready, got %v", state)
5447	}
5448	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
5449	defer cancel()
5450	// Make a fail-fast RPC.
5451	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
5452		t.Fatalf("TestService/EmptyCall(_,_) = _, %v, want _, nil", err)
5453	}
5454}
5455
5456type errCodec struct {
5457	noError bool
5458}
5459
5460func (c *errCodec) Marshal(v interface{}) ([]byte, error) {
5461	if c.noError {
5462		return []byte{}, nil
5463	}
5464	return nil, fmt.Errorf("3987^12 + 4365^12 = 4472^12")
5465}
5466
5467func (c *errCodec) Unmarshal(data []byte, v interface{}) error {
5468	return nil
5469}
5470
5471func (c *errCodec) Name() string {
5472	return "Fermat's near-miss."
5473}
5474
5475func (s) TestEncodeDoesntPanic(t *testing.T) {
5476	for _, e := range listTestEnv() {
5477		testEncodeDoesntPanic(t, e)
5478	}
5479}
5480
5481func testEncodeDoesntPanic(t *testing.T, e env) {
5482	te := newTest(t, e)
5483	erc := &errCodec{}
5484	te.customCodec = erc
5485	te.startServer(&testServer{security: e.security})
5486	defer te.tearDown()
5487	te.customCodec = nil
5488	tc := testpb.NewTestServiceClient(te.clientConn())
5489	// Failure case, should not panic.
5490	tc.EmptyCall(context.Background(), &testpb.Empty{})
5491	erc.noError = true
5492	// Passing case.
5493	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
5494		t.Fatalf("EmptyCall(_, _) = _, %v, want _, <nil>", err)
5495	}
5496}
5497
5498func (s) TestSvrWriteStatusEarlyWrite(t *testing.T) {
5499	for _, e := range listTestEnv() {
5500		testSvrWriteStatusEarlyWrite(t, e)
5501	}
5502}
5503
5504func testSvrWriteStatusEarlyWrite(t *testing.T, e env) {
5505	te := newTest(t, e)
5506	const smallSize = 1024
5507	const largeSize = 2048
5508	const extraLargeSize = 4096
5509	te.maxServerReceiveMsgSize = newInt(largeSize)
5510	te.maxServerSendMsgSize = newInt(largeSize)
5511	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
5512	if err != nil {
5513		t.Fatal(err)
5514	}
5515	extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
5516	if err != nil {
5517		t.Fatal(err)
5518	}
5519	te.startServer(&testServer{security: e.security})
5520	defer te.tearDown()
5521	tc := testpb.NewTestServiceClient(te.clientConn())
5522	respParam := []*testpb.ResponseParameters{
5523		{
5524			Size: int32(smallSize),
5525		},
5526	}
5527	sreq := &testpb.StreamingOutputCallRequest{
5528		ResponseType:       testpb.PayloadType_COMPRESSABLE,
5529		ResponseParameters: respParam,
5530		Payload:            extraLargePayload,
5531	}
5532	// Test recv case: server receives a message larger than maxServerReceiveMsgSize.
5533	stream, err := tc.FullDuplexCall(te.ctx)
5534	if err != nil {
5535		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5536	}
5537	if err = stream.Send(sreq); err != nil {
5538		t.Fatalf("%v.Send() = _, %v, want <nil>", stream, err)
5539	}
5540	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
5541		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5542	}
5543	// Test send case: server sends a message larger than maxServerSendMsgSize.
5544	sreq.Payload = smallPayload
5545	respParam[0].Size = int32(extraLargeSize)
5546
5547	stream, err = tc.FullDuplexCall(te.ctx)
5548	if err != nil {
5549		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5550	}
5551	if err = stream.Send(sreq); err != nil {
5552		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5553	}
5554	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
5555		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5556	}
5557}
5558
5559// The following functions with function name ending with TD indicates that they
5560// should be deleted after old service config API is deprecated and deleted.
5561func testServiceConfigSetupTD(t *testing.T, e env) (*test, chan grpc.ServiceConfig) {
5562	te := newTest(t, e)
5563	// We write before read.
5564	ch := make(chan grpc.ServiceConfig, 1)
5565	te.sc = ch
5566	te.userAgent = testAppUA
5567	te.declareLogNoise(
5568		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
5569		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
5570		"grpc: addrConn.resetTransport failed to create client transport: connection error",
5571		"Failed to dial : context canceled; please retry.",
5572	)
5573	return te, ch
5574}
5575
5576func (s) TestServiceConfigGetMethodConfigTD(t *testing.T) {
5577	for _, e := range listTestEnv() {
5578		testGetMethodConfigTD(t, e)
5579	}
5580}
5581
5582func testGetMethodConfigTD(t *testing.T, e env) {
5583	te, ch := testServiceConfigSetupTD(t, e)
5584	defer te.tearDown()
5585
5586	mc1 := grpc.MethodConfig{
5587		WaitForReady: newBool(true),
5588		Timeout:      newDuration(time.Millisecond),
5589	}
5590	mc2 := grpc.MethodConfig{WaitForReady: newBool(false)}
5591	m := make(map[string]grpc.MethodConfig)
5592	m["/grpc.testing.TestService/EmptyCall"] = mc1
5593	m["/grpc.testing.TestService/"] = mc2
5594	sc := grpc.ServiceConfig{
5595		Methods: m,
5596	}
5597	ch <- sc
5598
5599	cc := te.clientConn()
5600	tc := testpb.NewTestServiceClient(cc)
5601	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5602	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
5603		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5604	}
5605
5606	m = make(map[string]grpc.MethodConfig)
5607	m["/grpc.testing.TestService/UnaryCall"] = mc1
5608	m["/grpc.testing.TestService/"] = mc2
5609	sc = grpc.ServiceConfig{
5610		Methods: m,
5611	}
5612	ch <- sc
5613	// Wait for the new service config to propagate.
5614	for {
5615		if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded {
5616			continue
5617		}
5618		break
5619	}
5620	// The following RPCs are expected to become fail-fast.
5621	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable {
5622		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
5623	}
5624}
5625
5626func (s) TestServiceConfigWaitForReadyTD(t *testing.T) {
5627	for _, e := range listTestEnv() {
5628		testServiceConfigWaitForReadyTD(t, e)
5629	}
5630}
5631
5632func testServiceConfigWaitForReadyTD(t *testing.T, e env) {
5633	te, ch := testServiceConfigSetupTD(t, e)
5634	defer te.tearDown()
5635
5636	// Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
5637	mc := grpc.MethodConfig{
5638		WaitForReady: newBool(false),
5639		Timeout:      newDuration(time.Millisecond),
5640	}
5641	m := make(map[string]grpc.MethodConfig)
5642	m["/grpc.testing.TestService/EmptyCall"] = mc
5643	m["/grpc.testing.TestService/FullDuplexCall"] = mc
5644	sc := grpc.ServiceConfig{
5645		Methods: m,
5646	}
5647	ch <- sc
5648
5649	cc := te.clientConn()
5650	tc := testpb.NewTestServiceClient(cc)
5651	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5652	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
5653		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5654	}
5655	if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
5656		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5657	}
5658
5659	// Generate a service config update.
5660	// Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
5661	mc.WaitForReady = newBool(true)
5662	m = make(map[string]grpc.MethodConfig)
5663	m["/grpc.testing.TestService/EmptyCall"] = mc
5664	m["/grpc.testing.TestService/FullDuplexCall"] = mc
5665	sc = grpc.ServiceConfig{
5666		Methods: m,
5667	}
5668	ch <- sc
5669
5670	// Wait for the new service config to take effect.
5671	mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
5672	for {
5673		if !*mc.WaitForReady {
5674			time.Sleep(100 * time.Millisecond)
5675			mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
5676			continue
5677		}
5678		break
5679	}
5680	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5681	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
5682		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5683	}
5684	if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded {
5685		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5686	}
5687}
5688
5689func (s) TestServiceConfigTimeoutTD(t *testing.T) {
5690	for _, e := range listTestEnv() {
5691		testServiceConfigTimeoutTD(t, e)
5692	}
5693}
5694
5695func testServiceConfigTimeoutTD(t *testing.T, e env) {
5696	te, ch := testServiceConfigSetupTD(t, e)
5697	defer te.tearDown()
5698
5699	// Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
5700	mc := grpc.MethodConfig{
5701		Timeout: newDuration(time.Hour),
5702	}
5703	m := make(map[string]grpc.MethodConfig)
5704	m["/grpc.testing.TestService/EmptyCall"] = mc
5705	m["/grpc.testing.TestService/FullDuplexCall"] = mc
5706	sc := grpc.ServiceConfig{
5707		Methods: m,
5708	}
5709	ch <- sc
5710
5711	cc := te.clientConn()
5712	tc := testpb.NewTestServiceClient(cc)
5713	// The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
5714	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
5715	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
5716		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5717	}
5718	cancel()
5719	ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond)
5720	if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
5721		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5722	}
5723	cancel()
5724
5725	// Generate a service config update.
5726	// Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
5727	mc.Timeout = newDuration(time.Nanosecond)
5728	m = make(map[string]grpc.MethodConfig)
5729	m["/grpc.testing.TestService/EmptyCall"] = mc
5730	m["/grpc.testing.TestService/FullDuplexCall"] = mc
5731	sc = grpc.ServiceConfig{
5732		Methods: m,
5733	}
5734	ch <- sc
5735
5736	// Wait for the new service config to take effect.
5737	mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
5738	for {
5739		if *mc.Timeout != time.Nanosecond {
5740			time.Sleep(100 * time.Millisecond)
5741			mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
5742			continue
5743		}
5744		break
5745	}
5746
5747	ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
5748	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
5749		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5750	}
5751	cancel()
5752
5753	ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
5754	if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
5755		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5756	}
5757	cancel()
5758}
5759
5760func (s) TestServiceConfigMaxMsgSizeTD(t *testing.T) {
5761	for _, e := range listTestEnv() {
5762		testServiceConfigMaxMsgSizeTD(t, e)
5763	}
5764}
5765
5766func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) {
5767	// Setting up values and objects shared across all test cases.
5768	const smallSize = 1
5769	const largeSize = 1024
5770	const extraLargeSize = 2048
5771
5772	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
5773	if err != nil {
5774		t.Fatal(err)
5775	}
5776	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
5777	if err != nil {
5778		t.Fatal(err)
5779	}
5780	extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
5781	if err != nil {
5782		t.Fatal(err)
5783	}
5784
5785	mc := grpc.MethodConfig{
5786		MaxReqSize:  newInt(extraLargeSize),
5787		MaxRespSize: newInt(extraLargeSize),
5788	}
5789
5790	m := make(map[string]grpc.MethodConfig)
5791	m["/grpc.testing.TestService/UnaryCall"] = mc
5792	m["/grpc.testing.TestService/FullDuplexCall"] = mc
5793	sc := grpc.ServiceConfig{
5794		Methods: m,
5795	}
5796	// Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
5797	te1, ch1 := testServiceConfigSetupTD(t, e)
5798	te1.startServer(&testServer{security: e.security})
5799	defer te1.tearDown()
5800
5801	ch1 <- sc
5802	tc := testpb.NewTestServiceClient(te1.clientConn())
5803
5804	req := &testpb.SimpleRequest{
5805		ResponseType: testpb.PayloadType_COMPRESSABLE,
5806		ResponseSize: int32(extraLargeSize),
5807		Payload:      smallPayload,
5808	}
5809	// Test for unary RPC recv.
5810	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
5811		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5812	}
5813
5814	// Test for unary RPC send.
5815	req.Payload = extraLargePayload
5816	req.ResponseSize = int32(smallSize)
5817	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
5818		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5819	}
5820
5821	// Test for streaming RPC recv.
5822	respParam := []*testpb.ResponseParameters{
5823		{
5824			Size: int32(extraLargeSize),
5825		},
5826	}
5827	sreq := &testpb.StreamingOutputCallRequest{
5828		ResponseType:       testpb.PayloadType_COMPRESSABLE,
5829		ResponseParameters: respParam,
5830		Payload:            smallPayload,
5831	}
5832	stream, err := tc.FullDuplexCall(te1.ctx)
5833	if err != nil {
5834		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5835	}
5836	if err := stream.Send(sreq); err != nil {
5837		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5838	}
5839	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
5840		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5841	}
5842
5843	// Test for streaming RPC send.
5844	respParam[0].Size = int32(smallSize)
5845	sreq.Payload = extraLargePayload
5846	stream, err = tc.FullDuplexCall(te1.ctx)
5847	if err != nil {
5848		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5849	}
5850	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
5851		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
5852	}
5853
5854	// Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
5855	te2, ch2 := testServiceConfigSetupTD(t, e)
5856	te2.maxClientReceiveMsgSize = newInt(1024)
5857	te2.maxClientSendMsgSize = newInt(1024)
5858	te2.startServer(&testServer{security: e.security})
5859	defer te2.tearDown()
5860	ch2 <- sc
5861	tc = testpb.NewTestServiceClient(te2.clientConn())
5862
5863	// Test for unary RPC recv.
5864	req.Payload = smallPayload
5865	req.ResponseSize = int32(largeSize)
5866
5867	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
5868		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5869	}
5870
5871	// Test for unary RPC send.
5872	req.Payload = largePayload
5873	req.ResponseSize = int32(smallSize)
5874	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
5875		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5876	}
5877
5878	// Test for streaming RPC recv.
5879	stream, err = tc.FullDuplexCall(te2.ctx)
5880	respParam[0].Size = int32(largeSize)
5881	sreq.Payload = smallPayload
5882	if err != nil {
5883		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5884	}
5885	if err := stream.Send(sreq); err != nil {
5886		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5887	}
5888	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
5889		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5890	}
5891
5892	// Test for streaming RPC send.
5893	respParam[0].Size = int32(smallSize)
5894	sreq.Payload = largePayload
5895	stream, err = tc.FullDuplexCall(te2.ctx)
5896	if err != nil {
5897		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5898	}
5899	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
5900		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
5901	}
5902
5903	// Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
5904	te3, ch3 := testServiceConfigSetupTD(t, e)
5905	te3.maxClientReceiveMsgSize = newInt(4096)
5906	te3.maxClientSendMsgSize = newInt(4096)
5907	te3.startServer(&testServer{security: e.security})
5908	defer te3.tearDown()
5909	ch3 <- sc
5910	tc = testpb.NewTestServiceClient(te3.clientConn())
5911
5912	// Test for unary RPC recv.
5913	req.Payload = smallPayload
5914	req.ResponseSize = int32(largeSize)
5915
5916	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
5917		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
5918	}
5919
5920	req.ResponseSize = int32(extraLargeSize)
5921	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
5922		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5923	}
5924
5925	// Test for unary RPC send.
5926	req.Payload = largePayload
5927	req.ResponseSize = int32(smallSize)
5928	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
5929		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
5930	}
5931
5932	req.Payload = extraLargePayload
5933	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
5934		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
5935	}
5936
5937	// Test for streaming RPC recv.
5938	stream, err = tc.FullDuplexCall(te3.ctx)
5939	if err != nil {
5940		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5941	}
5942	respParam[0].Size = int32(largeSize)
5943	sreq.Payload = smallPayload
5944
5945	if err := stream.Send(sreq); err != nil {
5946		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5947	}
5948	if _, err := stream.Recv(); err != nil {
5949		t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
5950	}
5951
5952	respParam[0].Size = int32(extraLargeSize)
5953
5954	if err := stream.Send(sreq); err != nil {
5955		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5956	}
5957	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
5958		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5959	}
5960
5961	// Test for streaming RPC send.
5962	respParam[0].Size = int32(smallSize)
5963	sreq.Payload = largePayload
5964	stream, err = tc.FullDuplexCall(te3.ctx)
5965	if err != nil {
5966		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5967	}
5968	if err := stream.Send(sreq); err != nil {
5969		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5970	}
5971	sreq.Payload = extraLargePayload
5972	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
5973		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
5974	}
5975}
5976
5977func (s) TestMethodFromServerStream(t *testing.T) {
5978	const testMethod = "/package.service/method"
5979	e := tcpClearRREnv
5980	te := newTest(t, e)
5981	var method string
5982	var ok bool
5983	te.unknownHandler = func(srv interface{}, stream grpc.ServerStream) error {
5984		method, ok = grpc.MethodFromServerStream(stream)
5985		return nil
5986	}
5987
5988	te.startServer(nil)
5989	defer te.tearDown()
5990	_ = te.clientConn().Invoke(context.Background(), testMethod, nil, nil)
5991	if !ok || method != testMethod {
5992		t.Fatalf("Invoke with method %q, got %q, %v, want %q, true", testMethod, method, ok, testMethod)
5993	}
5994}
5995
5996func (s) TestInterceptorCanAccessCallOptions(t *testing.T) {
5997	e := tcpClearRREnv
5998	te := newTest(t, e)
5999	te.startServer(&testServer{security: e.security})
6000	defer te.tearDown()
6001
6002	type observedOptions struct {
6003		headers     []*metadata.MD
6004		trailers    []*metadata.MD
6005		peer        []*peer.Peer
6006		creds       []credentials.PerRPCCredentials
6007		failFast    []bool
6008		maxRecvSize []int
6009		maxSendSize []int
6010		compressor  []string
6011		subtype     []string
6012	}
6013	var observedOpts observedOptions
6014	populateOpts := func(opts []grpc.CallOption) {
6015		for _, o := range opts {
6016			switch o := o.(type) {
6017			case grpc.HeaderCallOption:
6018				observedOpts.headers = append(observedOpts.headers, o.HeaderAddr)
6019			case grpc.TrailerCallOption:
6020				observedOpts.trailers = append(observedOpts.trailers, o.TrailerAddr)
6021			case grpc.PeerCallOption:
6022				observedOpts.peer = append(observedOpts.peer, o.PeerAddr)
6023			case grpc.PerRPCCredsCallOption:
6024				observedOpts.creds = append(observedOpts.creds, o.Creds)
6025			case grpc.FailFastCallOption:
6026				observedOpts.failFast = append(observedOpts.failFast, o.FailFast)
6027			case grpc.MaxRecvMsgSizeCallOption:
6028				observedOpts.maxRecvSize = append(observedOpts.maxRecvSize, o.MaxRecvMsgSize)
6029			case grpc.MaxSendMsgSizeCallOption:
6030				observedOpts.maxSendSize = append(observedOpts.maxSendSize, o.MaxSendMsgSize)
6031			case grpc.CompressorCallOption:
6032				observedOpts.compressor = append(observedOpts.compressor, o.CompressorType)
6033			case grpc.ContentSubtypeCallOption:
6034				observedOpts.subtype = append(observedOpts.subtype, o.ContentSubtype)
6035			}
6036		}
6037	}
6038
6039	te.unaryClientInt = func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
6040		populateOpts(opts)
6041		return nil
6042	}
6043	te.streamClientInt = func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
6044		populateOpts(opts)
6045		return nil, nil
6046	}
6047
6048	defaults := []grpc.CallOption{
6049		grpc.WaitForReady(true),
6050		grpc.MaxCallRecvMsgSize(1010),
6051	}
6052	tc := testpb.NewTestServiceClient(te.clientConn(grpc.WithDefaultCallOptions(defaults...)))
6053
6054	var headers metadata.MD
6055	var trailers metadata.MD
6056	var pr peer.Peer
6057	tc.UnaryCall(context.Background(), &testpb.SimpleRequest{},
6058		grpc.MaxCallRecvMsgSize(100),
6059		grpc.MaxCallSendMsgSize(200),
6060		grpc.PerRPCCredentials(testPerRPCCredentials{}),
6061		grpc.Header(&headers),
6062		grpc.Trailer(&trailers),
6063		grpc.Peer(&pr))
6064	expected := observedOptions{
6065		failFast:    []bool{false},
6066		maxRecvSize: []int{1010, 100},
6067		maxSendSize: []int{200},
6068		creds:       []credentials.PerRPCCredentials{testPerRPCCredentials{}},
6069		headers:     []*metadata.MD{&headers},
6070		trailers:    []*metadata.MD{&trailers},
6071		peer:        []*peer.Peer{&pr},
6072	}
6073
6074	if !reflect.DeepEqual(expected, observedOpts) {
6075		t.Errorf("unary call did not observe expected options: expected %#v, got %#v", expected, observedOpts)
6076	}
6077
6078	observedOpts = observedOptions{} // reset
6079
6080	tc.StreamingInputCall(context.Background(),
6081		grpc.WaitForReady(false),
6082		grpc.MaxCallSendMsgSize(2020),
6083		grpc.UseCompressor("comp-type"),
6084		grpc.CallContentSubtype("json"))
6085	expected = observedOptions{
6086		failFast:    []bool{false, true},
6087		maxRecvSize: []int{1010},
6088		maxSendSize: []int{2020},
6089		compressor:  []string{"comp-type"},
6090		subtype:     []string{"json"},
6091	}
6092
6093	if !reflect.DeepEqual(expected, observedOpts) {
6094		t.Errorf("streaming call did not observe expected options: expected %#v, got %#v", expected, observedOpts)
6095	}
6096}
6097
6098func (s) TestCompressorRegister(t *testing.T) {
6099	for _, e := range listTestEnv() {
6100		testCompressorRegister(t, e)
6101	}
6102}
6103
6104func testCompressorRegister(t *testing.T, e env) {
6105	te := newTest(t, e)
6106	te.clientCompression = false
6107	te.serverCompression = false
6108	te.clientUseCompression = true
6109
6110	te.startServer(&testServer{security: e.security})
6111	defer te.tearDown()
6112	tc := testpb.NewTestServiceClient(te.clientConn())
6113
6114	// Unary call
6115	const argSize = 271828
6116	const respSize = 314159
6117	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
6118	if err != nil {
6119		t.Fatal(err)
6120	}
6121	req := &testpb.SimpleRequest{
6122		ResponseType: testpb.PayloadType_COMPRESSABLE,
6123		ResponseSize: respSize,
6124		Payload:      payload,
6125	}
6126	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something"))
6127	if _, err := tc.UnaryCall(ctx, req); err != nil {
6128		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
6129	}
6130	// Streaming RPC
6131	ctx, cancel := context.WithCancel(context.Background())
6132	defer cancel()
6133	stream, err := tc.FullDuplexCall(ctx)
6134	if err != nil {
6135		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6136	}
6137	respParam := []*testpb.ResponseParameters{
6138		{
6139			Size: 31415,
6140		},
6141	}
6142	payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
6143	if err != nil {
6144		t.Fatal(err)
6145	}
6146	sreq := &testpb.StreamingOutputCallRequest{
6147		ResponseType:       testpb.PayloadType_COMPRESSABLE,
6148		ResponseParameters: respParam,
6149		Payload:            payload,
6150	}
6151	if err := stream.Send(sreq); err != nil {
6152		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6153	}
6154	if _, err := stream.Recv(); err != nil {
6155		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
6156	}
6157}
6158
6159func (s) TestServeExitsWhenListenerClosed(t *testing.T) {
6160	ss := &stubServer{
6161		emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
6162			return &testpb.Empty{}, nil
6163		},
6164	}
6165
6166	s := grpc.NewServer()
6167	defer s.Stop()
6168	testpb.RegisterTestServiceServer(s, ss)
6169
6170	lis, err := net.Listen("tcp", "localhost:0")
6171	if err != nil {
6172		t.Fatalf("Failed to create listener: %v", err)
6173	}
6174
6175	done := make(chan struct{})
6176	go func() {
6177		s.Serve(lis)
6178		close(done)
6179	}()
6180
6181	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
6182	if err != nil {
6183		t.Fatalf("Failed to dial server: %v", err)
6184	}
6185	defer cc.Close()
6186	c := testpb.NewTestServiceClient(cc)
6187	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6188	defer cancel()
6189	if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
6190		t.Fatalf("Failed to send test RPC to server: %v", err)
6191	}
6192
6193	if err := lis.Close(); err != nil {
6194		t.Fatalf("Failed to close listener: %v", err)
6195	}
6196	const timeout = 5 * time.Second
6197	timer := time.NewTimer(timeout)
6198	select {
6199	case <-done:
6200		return
6201	case <-timer.C:
6202		t.Fatalf("Serve did not return after %v", timeout)
6203	}
6204}
6205
6206// Service handler returns status with invalid utf8 message.
6207func (s) TestStatusInvalidUTF8Message(t *testing.T) {
6208	var (
6209		origMsg = string([]byte{0xff, 0xfe, 0xfd})
6210		wantMsg = "���"
6211	)
6212
6213	ss := &stubServer{
6214		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
6215			return nil, status.Errorf(codes.Internal, origMsg)
6216		},
6217	}
6218	if err := ss.Start(nil); err != nil {
6219		t.Fatalf("Error starting endpoint server: %v", err)
6220	}
6221	defer ss.Stop()
6222
6223	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
6224	defer cancel()
6225
6226	if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); status.Convert(err).Message() != wantMsg {
6227		t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, status.Convert(err).Message(), wantMsg)
6228	}
6229}
6230
6231// Service handler returns status with details and invalid utf8 message. Proto
6232// will fail to marshal the status because of the invalid utf8 message. Details
6233// will be dropped when sending.
6234func (s) TestStatusInvalidUTF8Details(t *testing.T) {
6235	grpctest.TLogger.ExpectError("transport: failed to marshal rpc status")
6236
6237	var (
6238		origMsg = string([]byte{0xff, 0xfe, 0xfd})
6239		wantMsg = "���"
6240	)
6241
6242	ss := &stubServer{
6243		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
6244			st := status.New(codes.Internal, origMsg)
6245			st, err := st.WithDetails(&testpb.Empty{})
6246			if err != nil {
6247				return nil, err
6248			}
6249			return nil, st.Err()
6250		},
6251	}
6252	if err := ss.Start(nil); err != nil {
6253		t.Fatalf("Error starting endpoint server: %v", err)
6254	}
6255	defer ss.Stop()
6256
6257	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
6258	defer cancel()
6259
6260	_, err := ss.client.EmptyCall(ctx, &testpb.Empty{})
6261	st := status.Convert(err)
6262	if st.Message() != wantMsg {
6263		t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, st.Message(), wantMsg)
6264	}
6265	if len(st.Details()) != 0 {
6266		// Details should be dropped on the server side.
6267		t.Fatalf("RPC status contain details: %v, want no details", st.Details())
6268	}
6269}
6270
6271func (s) TestClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T) {
6272	for _, e := range listTestEnv() {
6273		if e.httpHandler {
6274			continue
6275		}
6276		testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t, e)
6277	}
6278}
6279
6280func testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T, e env) {
6281	te := newTest(t, e)
6282	te.userAgent = testAppUA
6283	smallSize := 1024
6284	te.maxServerReceiveMsgSize = &smallSize
6285	te.startServer(&testServer{security: e.security})
6286	defer te.tearDown()
6287	tc := testpb.NewTestServiceClient(te.clientConn())
6288	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1048576)
6289	if err != nil {
6290		t.Fatal(err)
6291	}
6292	req := &testpb.SimpleRequest{
6293		ResponseType: testpb.PayloadType_COMPRESSABLE,
6294		Payload:      payload,
6295	}
6296	var wg sync.WaitGroup
6297	for i := 0; i < 10; i++ {
6298		wg.Add(1)
6299		go func() {
6300			defer wg.Done()
6301			for j := 0; j < 100; j++ {
6302				ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
6303				defer cancel()
6304				if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.ResourceExhausted {
6305					t.Errorf("TestService/UnaryCall(_,_) = _. %v, want code: %s", err, codes.ResourceExhausted)
6306					return
6307				}
6308			}
6309		}()
6310	}
6311	wg.Wait()
6312}
6313
6314func (s) TestRPCTimeout(t *testing.T) {
6315	for _, e := range listTestEnv() {
6316		testRPCTimeout(t, e)
6317	}
6318}
6319
6320func testRPCTimeout(t *testing.T, e env) {
6321	te := newTest(t, e)
6322	te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond})
6323	defer te.tearDown()
6324
6325	cc := te.clientConn()
6326	tc := testpb.NewTestServiceClient(cc)
6327
6328	const argSize = 2718
6329	const respSize = 314
6330
6331	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
6332	if err != nil {
6333		t.Fatal(err)
6334	}
6335
6336	req := &testpb.SimpleRequest{
6337		ResponseType: testpb.PayloadType_COMPRESSABLE,
6338		ResponseSize: respSize,
6339		Payload:      payload,
6340	}
6341	for i := -1; i <= 10; i++ {
6342		ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
6343		if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded {
6344			t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
6345		}
6346		cancel()
6347	}
6348}
6349
6350func (s) TestDisabledIOBuffers(t *testing.T) {
6351	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(60000))
6352	if err != nil {
6353		t.Fatalf("Failed to create payload: %v", err)
6354	}
6355	req := &testpb.StreamingOutputCallRequest{
6356		Payload: payload,
6357	}
6358	resp := &testpb.StreamingOutputCallResponse{
6359		Payload: payload,
6360	}
6361
6362	ss := &stubServer{
6363		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
6364			for {
6365				in, err := stream.Recv()
6366				if err == io.EOF {
6367					return nil
6368				}
6369				if err != nil {
6370					t.Errorf("stream.Recv() = _, %v, want _, <nil>", err)
6371					return err
6372				}
6373				if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
6374					t.Errorf("Received message(len: %v) on server not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
6375					return err
6376				}
6377				if err := stream.Send(resp); err != nil {
6378					t.Errorf("stream.Send(_)= %v, want <nil>", err)
6379					return err
6380				}
6381
6382			}
6383		},
6384	}
6385
6386	s := grpc.NewServer(grpc.WriteBufferSize(0), grpc.ReadBufferSize(0))
6387	testpb.RegisterTestServiceServer(s, ss)
6388
6389	lis, err := net.Listen("tcp", "localhost:0")
6390	if err != nil {
6391		t.Fatalf("Failed to create listener: %v", err)
6392	}
6393
6394	done := make(chan struct{})
6395	go func() {
6396		s.Serve(lis)
6397		close(done)
6398	}()
6399	defer s.Stop()
6400	dctx, dcancel := context.WithTimeout(context.Background(), 5*time.Second)
6401	defer dcancel()
6402	cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithWriteBufferSize(0), grpc.WithReadBufferSize(0))
6403	if err != nil {
6404		t.Fatalf("Failed to dial server")
6405	}
6406	defer cc.Close()
6407	c := testpb.NewTestServiceClient(cc)
6408	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6409	defer cancel()
6410	stream, err := c.FullDuplexCall(ctx, grpc.WaitForReady(true))
6411	if err != nil {
6412		t.Fatalf("Failed to send test RPC to server")
6413	}
6414	for i := 0; i < 10; i++ {
6415		if err := stream.Send(req); err != nil {
6416			t.Fatalf("stream.Send(_) = %v, want <nil>", err)
6417		}
6418		in, err := stream.Recv()
6419		if err != nil {
6420			t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
6421		}
6422		if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
6423			t.Fatalf("Received message(len: %v) on client not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
6424		}
6425	}
6426	stream.CloseSend()
6427	if _, err := stream.Recv(); err != io.EOF {
6428		t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
6429	}
6430}
6431
6432func (s) TestServerMaxHeaderListSizeClientUserViolation(t *testing.T) {
6433	for _, e := range listTestEnv() {
6434		if e.httpHandler {
6435			continue
6436		}
6437		testServerMaxHeaderListSizeClientUserViolation(t, e)
6438	}
6439}
6440
6441func testServerMaxHeaderListSizeClientUserViolation(t *testing.T, e env) {
6442	te := newTest(t, e)
6443	te.maxServerHeaderListSize = new(uint32)
6444	*te.maxServerHeaderListSize = 216
6445	te.startServer(&testServer{security: e.security})
6446	defer te.tearDown()
6447
6448	cc := te.clientConn()
6449	tc := testpb.NewTestServiceClient(cc)
6450	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6451	defer cancel()
6452	metadata.AppendToOutgoingContext(ctx, "oversize", string(make([]byte, 216)))
6453	var err error
6454	if err = verifyResultWithDelay(func() (bool, error) {
6455		if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal {
6456			return true, nil
6457		}
6458		return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal)
6459	}); err != nil {
6460		t.Fatal(err)
6461	}
6462}
6463
6464func (s) TestClientMaxHeaderListSizeServerUserViolation(t *testing.T) {
6465	for _, e := range listTestEnv() {
6466		if e.httpHandler {
6467			continue
6468		}
6469		testClientMaxHeaderListSizeServerUserViolation(t, e)
6470	}
6471}
6472
6473func testClientMaxHeaderListSizeServerUserViolation(t *testing.T, e env) {
6474	te := newTest(t, e)
6475	te.maxClientHeaderListSize = new(uint32)
6476	*te.maxClientHeaderListSize = 1 // any header server sends will violate
6477	te.startServer(&testServer{security: e.security})
6478	defer te.tearDown()
6479
6480	cc := te.clientConn()
6481	tc := testpb.NewTestServiceClient(cc)
6482	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6483	defer cancel()
6484	var err error
6485	if err = verifyResultWithDelay(func() (bool, error) {
6486		if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal {
6487			return true, nil
6488		}
6489		return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal)
6490	}); err != nil {
6491		t.Fatal(err)
6492	}
6493}
6494
6495func (s) TestServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T) {
6496	for _, e := range listTestEnv() {
6497		if e.httpHandler || e.security == "tls" {
6498			continue
6499		}
6500		testServerMaxHeaderListSizeClientIntentionalViolation(t, e)
6501	}
6502}
6503
6504func testServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T, e env) {
6505	te := newTest(t, e)
6506	te.maxServerHeaderListSize = new(uint32)
6507	*te.maxServerHeaderListSize = 512
6508	te.startServer(&testServer{security: e.security})
6509	defer te.tearDown()
6510
6511	cc, dw := te.clientConnWithConnControl()
6512	tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
6513	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6514	defer cancel()
6515	stream, err := tc.FullDuplexCall(ctx)
6516	if err != nil {
6517		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
6518	}
6519	rcw := dw.getRawConnWrapper()
6520	val := make([]string, 512)
6521	for i := range val {
6522		val[i] = "a"
6523	}
6524	// allow for client to send the initial header
6525	time.Sleep(100 * time.Millisecond)
6526	rcw.writeHeaders(http2.HeadersFrameParam{
6527		StreamID:      tc.getCurrentStreamID(),
6528		BlockFragment: rcw.encodeHeader("oversize", strings.Join(val, "")),
6529		EndStream:     false,
6530		EndHeaders:    true,
6531	})
6532	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal {
6533		t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal)
6534	}
6535}
6536
6537func (s) TestClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T) {
6538	for _, e := range listTestEnv() {
6539		if e.httpHandler || e.security == "tls" {
6540			continue
6541		}
6542		testClientMaxHeaderListSizeServerIntentionalViolation(t, e)
6543	}
6544}
6545
6546func testClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T, e env) {
6547	te := newTest(t, e)
6548	te.maxClientHeaderListSize = new(uint32)
6549	*te.maxClientHeaderListSize = 200
6550	lw := te.startServerWithConnControl(&testServer{security: e.security, setHeaderOnly: true})
6551	defer te.tearDown()
6552	cc, _ := te.clientConnWithConnControl()
6553	tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
6554	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6555	defer cancel()
6556	stream, err := tc.FullDuplexCall(ctx)
6557	if err != nil {
6558		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
6559	}
6560	var i int
6561	var rcw *rawConnWrapper
6562	for i = 0; i < 100; i++ {
6563		rcw = lw.getLastConn()
6564		if rcw != nil {
6565			break
6566		}
6567		time.Sleep(10 * time.Millisecond)
6568		continue
6569	}
6570	if i == 100 {
6571		t.Fatalf("failed to create server transport after 1s")
6572	}
6573
6574	val := make([]string, 200)
6575	for i := range val {
6576		val[i] = "a"
6577	}
6578	// allow for client to send the initial header.
6579	time.Sleep(100 * time.Millisecond)
6580	rcw.writeHeaders(http2.HeadersFrameParam{
6581		StreamID:      tc.getCurrentStreamID(),
6582		BlockFragment: rcw.encodeRawHeader("oversize", strings.Join(val, "")),
6583		EndStream:     false,
6584		EndHeaders:    true,
6585	})
6586	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal {
6587		t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal)
6588	}
6589}
6590
6591func (s) TestNetPipeConn(t *testing.T) {
6592	// This test will block indefinitely if grpc writes both client and server
6593	// prefaces without either reading from the Conn.
6594	pl := testutils.NewPipeListener()
6595	s := grpc.NewServer()
6596	defer s.Stop()
6597	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
6598		return &testpb.SimpleResponse{}, nil
6599	}}
6600	testpb.RegisterTestServiceServer(s, ts)
6601	go s.Serve(pl)
6602	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6603	defer cancel()
6604	cc, err := grpc.DialContext(ctx, "", grpc.WithInsecure(), grpc.WithDialer(pl.Dialer()))
6605	if err != nil {
6606		t.Fatalf("Error creating client: %v", err)
6607	}
6608	defer cc.Close()
6609	client := testpb.NewTestServiceClient(cc)
6610	if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
6611		t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
6612	}
6613}
6614
6615func (s) TestLargeTimeout(t *testing.T) {
6616	for _, e := range listTestEnv() {
6617		testLargeTimeout(t, e)
6618	}
6619}
6620
6621func testLargeTimeout(t *testing.T, e env) {
6622	te := newTest(t, e)
6623	te.declareLogNoise("Server.processUnaryRPC failed to write status")
6624
6625	ts := &funcServer{}
6626	te.startServer(ts)
6627	defer te.tearDown()
6628	tc := testpb.NewTestServiceClient(te.clientConn())
6629
6630	timeouts := []time.Duration{
6631		time.Duration(math.MaxInt64), // will be (correctly) converted to
6632		// 2562048 hours, which overflows upon converting back to an int64
6633		2562047 * time.Hour, // the largest timeout that does not overflow
6634	}
6635
6636	for i, maxTimeout := range timeouts {
6637		ts.unaryCall = func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
6638			deadline, ok := ctx.Deadline()
6639			timeout := time.Until(deadline)
6640			minTimeout := maxTimeout - 5*time.Second
6641			if !ok || timeout < minTimeout || timeout > maxTimeout {
6642				t.Errorf("ctx.Deadline() = (now+%v), %v; want [%v, %v], true", timeout, ok, minTimeout, maxTimeout)
6643				return nil, status.Error(codes.OutOfRange, "deadline error")
6644			}
6645			return &testpb.SimpleResponse{}, nil
6646		}
6647
6648		ctx, cancel := context.WithTimeout(context.Background(), maxTimeout)
6649		defer cancel()
6650
6651		if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
6652			t.Errorf("case %v: UnaryCall(_) = _, %v; want _, nil", i, err)
6653		}
6654	}
6655}
6656
6657// Proxies typically send GO_AWAY followed by connection closure a minute or so later. This
6658// test ensures that the connection is re-created after GO_AWAY and not affected by the
6659// subsequent (old) connection closure.
6660func (s) TestGoAwayThenClose(t *testing.T) {
6661	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
6662	defer cancel()
6663
6664	lis1, err := net.Listen("tcp", "localhost:0")
6665	if err != nil {
6666		t.Fatalf("Error while listening. Err: %v", err)
6667	}
6668	s1 := grpc.NewServer()
6669	defer s1.Stop()
6670	ts := &funcServer{
6671		unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
6672			return &testpb.SimpleResponse{}, nil
6673		},
6674		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
6675			// Wait forever.
6676			_, err := stream.Recv()
6677			if err == nil {
6678				t.Error("expected to never receive any message")
6679			}
6680			return err
6681		},
6682	}
6683	testpb.RegisterTestServiceServer(s1, ts)
6684	go s1.Serve(lis1)
6685
6686	conn2Established := grpcsync.NewEvent()
6687	lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established)
6688	if err != nil {
6689		t.Fatalf("Error while listening. Err: %v", err)
6690	}
6691	s2 := grpc.NewServer()
6692	defer s2.Stop()
6693	testpb.RegisterTestServiceServer(s2, ts)
6694	go s2.Serve(lis2)
6695
6696	r, rcleanup := manual.GenerateAndRegisterManualResolver()
6697	defer rcleanup()
6698	r.InitialState(resolver.State{Addresses: []resolver.Address{
6699		{Addr: lis1.Addr().String()},
6700	}})
6701	cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure())
6702	if err != nil {
6703		t.Fatalf("Error creating client: %v", err)
6704	}
6705	defer cc.Close()
6706
6707	client := testpb.NewTestServiceClient(cc)
6708
6709	// Should go on connection 1. We use a long-lived RPC because it will cause GracefulStop to send GO_AWAY, but the
6710	// connection doesn't get closed until the server stops and the client receives.
6711	stream, err := client.FullDuplexCall(ctx)
6712	if err != nil {
6713		t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err)
6714	}
6715
6716	r.UpdateState(resolver.State{Addresses: []resolver.Address{
6717		{Addr: lis1.Addr().String()},
6718		{Addr: lis2.Addr().String()},
6719	}})
6720
6721	// Send GO_AWAY to connection 1.
6722	go s1.GracefulStop()
6723
6724	// Wait for connection 2 to be established.
6725	<-conn2Established.Done()
6726
6727	// Close connection 1.
6728	s1.Stop()
6729
6730	// Wait for client to close.
6731	_, err = stream.Recv()
6732	if err == nil {
6733		t.Fatal("expected the stream to die, but got a successful Recv")
6734	}
6735
6736	// Do a bunch of RPCs, make sure it stays stable. These should go to connection 2.
6737	for i := 0; i < 10; i++ {
6738		if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
6739			t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
6740		}
6741	}
6742}
6743
6744func listenWithNotifyingListener(network, address string, event *grpcsync.Event) (net.Listener, error) {
6745	lis, err := net.Listen(network, address)
6746	if err != nil {
6747		return nil, err
6748	}
6749	return notifyingListener{connEstablished: event, Listener: lis}, nil
6750}
6751
6752type notifyingListener struct {
6753	connEstablished *grpcsync.Event
6754	net.Listener
6755}
6756
6757func (lis notifyingListener) Accept() (net.Conn, error) {
6758	defer lis.connEstablished.Fire()
6759	return lis.Listener.Accept()
6760}
6761
6762func (s) TestRPCWaitsForResolver(t *testing.T) {
6763	te := testServiceConfigSetup(t, tcpClearRREnv)
6764	te.startServer(&testServer{security: tcpClearRREnv.security})
6765	defer te.tearDown()
6766	r, rcleanup := manual.GenerateAndRegisterManualResolver()
6767	defer rcleanup()
6768
6769	te.resolverScheme = r.Scheme()
6770	te.nonBlockingDial = true
6771	cc := te.clientConn()
6772	tc := testpb.NewTestServiceClient(cc)
6773
6774	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
6775	defer cancel()
6776	// With no resolved addresses yet, this will timeout.
6777	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
6778		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
6779	}
6780
6781	ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
6782	defer cancel()
6783	go func() {
6784		time.Sleep(time.Second)
6785		r.UpdateState(resolver.State{
6786			Addresses: []resolver.Address{{Addr: te.srvAddr}},
6787			ServiceConfig: parseCfg(r, `{
6788		    "methodConfig": [
6789		        {
6790		            "name": [
6791		                {
6792		                    "service": "grpc.testing.TestService",
6793		                    "method": "UnaryCall"
6794		                }
6795		            ],
6796                    "maxRequestMessageBytes": 0
6797		        }
6798		    ]
6799		}`)})
6800	}()
6801	// We wait a second before providing a service config and resolving
6802	// addresses.  So this will wait for that and then honor the
6803	// maxRequestMessageBytes it contains.
6804	if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{ResponseType: testpb.PayloadType_UNCOMPRESSABLE}); status.Code(err) != codes.ResourceExhausted {
6805		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err)
6806	}
6807	if got := ctx.Err(); got != nil {
6808		t.Fatalf("ctx.Err() = %v; want nil (deadline should be set short by service config)", got)
6809	}
6810	if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
6811		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err)
6812	}
6813}
6814
6815func (s) TestHTTPHeaderFrameErrorHandlingHTTPMode(t *testing.T) {
6816	// Non-gRPC content-type fallback path.
6817	for httpCode := range transport.HTTPStatusConvTab {
6818		doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{
6819			":status", fmt.Sprintf("%d", httpCode),
6820			"content-type", "text/html", // non-gRPC content type to switch to HTTP mode.
6821			"grpc-status", "1", // Make up a gRPC status error
6822			"grpc-status-details-bin", "???", // Make up a gRPC field parsing error
6823		})
6824	}
6825
6826	// Missing content-type fallback path.
6827	for httpCode := range transport.HTTPStatusConvTab {
6828		doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{
6829			":status", fmt.Sprintf("%d", httpCode),
6830			// Omitting content type to switch to HTTP mode.
6831			"grpc-status", "1", // Make up a gRPC status error
6832			"grpc-status-details-bin", "???", // Make up a gRPC field parsing error
6833		})
6834	}
6835
6836	// Malformed HTTP status when fallback.
6837	doHTTPHeaderTest(t, codes.Internal, []string{
6838		":status", "abc",
6839		// Omitting content type to switch to HTTP mode.
6840		"grpc-status", "1", // Make up a gRPC status error
6841		"grpc-status-details-bin", "???", // Make up a gRPC field parsing error
6842	})
6843}
6844
6845// Testing erroneous ResponseHeader or Trailers-only (delivered in the first HEADERS frame).
6846func (s) TestHTTPHeaderFrameErrorHandlingInitialHeader(t *testing.T) {
6847	for _, test := range []struct {
6848		header  []string
6849		errCode codes.Code
6850	}{
6851		{
6852			// missing gRPC status.
6853			header: []string{
6854				":status", "403",
6855				"content-type", "application/grpc",
6856			},
6857			errCode: codes.Unknown,
6858		},
6859		{
6860			// malformed grpc-status.
6861			header: []string{
6862				":status", "502",
6863				"content-type", "application/grpc",
6864				"grpc-status", "abc",
6865			},
6866			errCode: codes.Internal,
6867		},
6868		{
6869			// Malformed grpc-tags-bin field.
6870			header: []string{
6871				":status", "502",
6872				"content-type", "application/grpc",
6873				"grpc-status", "0",
6874				"grpc-tags-bin", "???",
6875			},
6876			errCode: codes.Internal,
6877		},
6878		{
6879			// gRPC status error.
6880			header: []string{
6881				":status", "502",
6882				"content-type", "application/grpc",
6883				"grpc-status", "3",
6884			},
6885			errCode: codes.InvalidArgument,
6886		},
6887	} {
6888		doHTTPHeaderTest(t, test.errCode, test.header)
6889	}
6890}
6891
6892// Testing non-Trailers-only Trailers (delievered in second HEADERS frame)
6893func (s) TestHTTPHeaderFrameErrorHandlingNormalTrailer(t *testing.T) {
6894	for _, test := range []struct {
6895		responseHeader []string
6896		trailer        []string
6897		errCode        codes.Code
6898	}{
6899		{
6900			responseHeader: []string{
6901				":status", "200",
6902				"content-type", "application/grpc",
6903			},
6904			trailer: []string{
6905				// trailer missing grpc-status
6906				":status", "502",
6907			},
6908			errCode: codes.Unknown,
6909		},
6910		{
6911			responseHeader: []string{
6912				":status", "404",
6913				"content-type", "application/grpc",
6914			},
6915			trailer: []string{
6916				// malformed grpc-status-details-bin field
6917				"grpc-status", "0",
6918				"grpc-status-details-bin", "????",
6919			},
6920			errCode: codes.Internal,
6921		},
6922	} {
6923		doHTTPHeaderTest(t, test.errCode, test.responseHeader, test.trailer)
6924	}
6925}
6926
6927func (s) TestHTTPHeaderFrameErrorHandlingMoreThanTwoHeaders(t *testing.T) {
6928	header := []string{
6929		":status", "200",
6930		"content-type", "application/grpc",
6931	}
6932	doHTTPHeaderTest(t, codes.Internal, header, header, header)
6933}
6934
6935type httpServer struct {
6936	headerFields [][]string
6937}
6938
6939func (s *httpServer) writeHeader(framer *http2.Framer, sid uint32, headerFields []string, endStream bool) error {
6940	if len(headerFields)%2 == 1 {
6941		panic("odd number of kv args")
6942	}
6943
6944	var buf bytes.Buffer
6945	henc := hpack.NewEncoder(&buf)
6946	for len(headerFields) > 0 {
6947		k, v := headerFields[0], headerFields[1]
6948		headerFields = headerFields[2:]
6949		henc.WriteField(hpack.HeaderField{Name: k, Value: v})
6950	}
6951
6952	return framer.WriteHeaders(http2.HeadersFrameParam{
6953		StreamID:      sid,
6954		BlockFragment: buf.Bytes(),
6955		EndStream:     endStream,
6956		EndHeaders:    true,
6957	})
6958}
6959
6960func (s *httpServer) start(t *testing.T, lis net.Listener) {
6961	// Launch an HTTP server to send back header.
6962	go func() {
6963		conn, err := lis.Accept()
6964		if err != nil {
6965			t.Errorf("Error accepting connection: %v", err)
6966			return
6967		}
6968		defer conn.Close()
6969		// Read preface sent by client.
6970		if _, err = io.ReadFull(conn, make([]byte, len(http2.ClientPreface))); err != nil {
6971			t.Errorf("Error at server-side while reading preface from client. Err: %v", err)
6972			return
6973		}
6974		reader := bufio.NewReader(conn)
6975		writer := bufio.NewWriter(conn)
6976		framer := http2.NewFramer(writer, reader)
6977		if err = framer.WriteSettingsAck(); err != nil {
6978			t.Errorf("Error at server-side while sending Settings ack. Err: %v", err)
6979			return
6980		}
6981		writer.Flush() // necessary since client is expecting preface before declaring connection fully setup.
6982
6983		var sid uint32
6984		// Read frames until a header is received.
6985		for {
6986			frame, err := framer.ReadFrame()
6987			if err != nil {
6988				t.Errorf("Error at server-side while reading frame. Err: %v", err)
6989				return
6990			}
6991			if hframe, ok := frame.(*http2.HeadersFrame); ok {
6992				sid = hframe.Header().StreamID
6993				break
6994			}
6995		}
6996		for i, headers := range s.headerFields {
6997			if err = s.writeHeader(framer, sid, headers, i == len(s.headerFields)-1); err != nil {
6998				t.Errorf("Error at server-side while writing headers. Err: %v", err)
6999				return
7000			}
7001			writer.Flush()
7002		}
7003	}()
7004}
7005
7006func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string) {
7007	t.Helper()
7008	lis, err := net.Listen("tcp", "localhost:0")
7009	if err != nil {
7010		t.Fatalf("Failed to listen. Err: %v", err)
7011	}
7012	defer lis.Close()
7013	server := &httpServer{
7014		headerFields: headerFields,
7015	}
7016	server.start(t, lis)
7017	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
7018	if err != nil {
7019		t.Fatalf("failed to dial due to err: %v", err)
7020	}
7021	defer cc.Close()
7022	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
7023	defer cancel()
7024	client := testpb.NewTestServiceClient(cc)
7025	stream, err := client.FullDuplexCall(ctx)
7026	if err != nil {
7027		t.Fatalf("error creating stream due to err: %v", err)
7028	}
7029	if _, err := stream.Recv(); err == nil || status.Code(err) != errCode {
7030		t.Fatalf("stream.Recv() = _, %v, want error code: %v", err, errCode)
7031	}
7032}
7033
7034func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
7035	g := r.CC.ParseServiceConfig(s)
7036	if g.Err != nil {
7037		panic(fmt.Sprintf("Error parsing config %q: %v", s, g.Err))
7038	}
7039	return g
7040}
7041
7042func (s) TestClientCancellationPropagatesUnary(t *testing.T) {
7043	wg := &sync.WaitGroup{}
7044	called, done := make(chan struct{}), make(chan struct{})
7045	ss := &stubServer{
7046		emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
7047			close(called)
7048			<-ctx.Done()
7049			err := ctx.Err()
7050			if err != context.Canceled {
7051				t.Errorf("ctx.Err() = %v; want context.Canceled", err)
7052			}
7053			close(done)
7054			return nil, err
7055		},
7056	}
7057	if err := ss.Start(nil); err != nil {
7058		t.Fatalf("Error starting endpoint server: %v", err)
7059	}
7060	defer ss.Stop()
7061
7062	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
7063
7064	wg.Add(1)
7065	go func() {
7066		if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Canceled {
7067			t.Errorf("ss.client.EmptyCall() = _, %v; want _, Code()=codes.Canceled", err)
7068		}
7069		wg.Done()
7070	}()
7071
7072	select {
7073	case <-called:
7074	case <-time.After(5 * time.Second):
7075		t.Fatalf("failed to perform EmptyCall after 10s")
7076	}
7077	cancel()
7078	select {
7079	case <-done:
7080	case <-time.After(5 * time.Second):
7081		t.Fatalf("server failed to close done chan due to cancellation propagation")
7082	}
7083	wg.Wait()
7084}
7085
7086type badGzipCompressor struct{}
7087
7088func (badGzipCompressor) Do(w io.Writer, p []byte) error {
7089	buf := &bytes.Buffer{}
7090	gzw := gzip.NewWriter(buf)
7091	if _, err := gzw.Write(p); err != nil {
7092		return err
7093	}
7094	err := gzw.Close()
7095	bs := buf.Bytes()
7096	if len(bs) >= 6 {
7097		bs[len(bs)-6] ^= 1 // modify checksum at end by 1 byte
7098	}
7099	w.Write(bs)
7100	return err
7101}
7102
7103func (badGzipCompressor) Type() string {
7104	return "gzip"
7105}
7106
7107func (s) TestGzipBadChecksum(t *testing.T) {
7108	ss := &stubServer{
7109		unaryCall: func(ctx context.Context, _ *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
7110			return &testpb.SimpleResponse{}, nil
7111		},
7112	}
7113	if err := ss.Start(nil, grpc.WithCompressor(badGzipCompressor{})); err != nil {
7114		t.Fatalf("Error starting endpoint server: %v", err)
7115	}
7116	defer ss.Stop()
7117
7118	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
7119	defer cancel()
7120
7121	p, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1024))
7122	if err != nil {
7123		t.Fatalf("Unexpected error from newPayload: %v", err)
7124	}
7125	if _, err := ss.client.UnaryCall(ctx, &testpb.SimpleRequest{Payload: p}); err == nil ||
7126		status.Code(err) != codes.Internal ||
7127		!strings.Contains(status.Convert(err).Message(), gzip.ErrChecksum.Error()) {
7128		t.Errorf("ss.client.UnaryCall(_) = _, %v\n\twant: _, status(codes.Internal, contains %q)", err, gzip.ErrChecksum)
7129	}
7130}
7131
7132// When an RPC is canceled, it's possible that the last Recv() returns before
7133// all call options' after are executed.
7134func (s) TestCanceledRPCCallOptionRace(t *testing.T) {
7135	ss := &stubServer{
7136		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
7137			err := stream.Send(&testpb.StreamingOutputCallResponse{})
7138			if err != nil {
7139				return err
7140			}
7141			<-stream.Context().Done()
7142			return nil
7143		},
7144	}
7145	if err := ss.Start(nil); err != nil {
7146		t.Fatalf("Error starting endpoint server: %v", err)
7147	}
7148	defer ss.Stop()
7149
7150	const count = 1000
7151	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
7152	defer cancel()
7153
7154	var wg sync.WaitGroup
7155	wg.Add(count)
7156	for i := 0; i < count; i++ {
7157		go func() {
7158			defer wg.Done()
7159			var p peer.Peer
7160			ctx, cancel := context.WithCancel(ctx)
7161			defer cancel()
7162			stream, err := ss.client.FullDuplexCall(ctx, grpc.Peer(&p))
7163			if err != nil {
7164				t.Errorf("_.FullDuplexCall(_) = _, %v", err)
7165				return
7166			}
7167			if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
7168				t.Errorf("_ has error %v while sending", err)
7169				return
7170			}
7171			if _, err := stream.Recv(); err != nil {
7172				t.Errorf("%v.Recv() = %v", stream, err)
7173				return
7174			}
7175			cancel()
7176			if _, err := stream.Recv(); status.Code(err) != codes.Canceled {
7177				t.Errorf("%v compleled with error %v, want %s", stream, err, codes.Canceled)
7178				return
7179			}
7180			// If recv returns before call options are executed, peer.Addr is not set,
7181			// fail the test.
7182			if p.Addr == nil {
7183				t.Errorf("peer.Addr is nil, want non-nil")
7184				return
7185			}
7186		}()
7187	}
7188	wg.Wait()
7189}
7190