1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19//go:generate protoc --go_out=plugins=grpc:. codec_perf/perf.proto
20//go:generate protoc --go_out=plugins=grpc:. grpc_testing/test.proto
21
22package test
23
24import (
25	"bufio"
26	"bytes"
27	"context"
28	"crypto/tls"
29	"errors"
30	"flag"
31	"fmt"
32	"io"
33	"math"
34	"net"
35	"net/http"
36	"os"
37	"reflect"
38	"runtime"
39	"strings"
40	"sync"
41	"sync/atomic"
42	"syscall"
43	"testing"
44	"time"
45
46	"github.com/golang/protobuf/proto"
47	anypb "github.com/golang/protobuf/ptypes/any"
48	"golang.org/x/net/http2"
49	"golang.org/x/net/http2/hpack"
50	spb "google.golang.org/genproto/googleapis/rpc/status"
51	"google.golang.org/grpc"
52	"google.golang.org/grpc/balancer/roundrobin"
53	"google.golang.org/grpc/codes"
54	"google.golang.org/grpc/connectivity"
55	"google.golang.org/grpc/credentials"
56	"google.golang.org/grpc/encoding"
57	_ "google.golang.org/grpc/encoding/gzip"
58	_ "google.golang.org/grpc/grpclog/glogger"
59	"google.golang.org/grpc/health"
60	healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
61	healthpb "google.golang.org/grpc/health/grpc_health_v1"
62	"google.golang.org/grpc/internal/channelz"
63	"google.golang.org/grpc/internal/grpcsync"
64	"google.golang.org/grpc/internal/grpctest"
65	"google.golang.org/grpc/internal/leakcheck"
66	"google.golang.org/grpc/internal/testutils"
67	"google.golang.org/grpc/internal/transport"
68	"google.golang.org/grpc/metadata"
69	"google.golang.org/grpc/peer"
70	"google.golang.org/grpc/resolver"
71	"google.golang.org/grpc/resolver/manual"
72	_ "google.golang.org/grpc/resolver/passthrough"
73	"google.golang.org/grpc/stats"
74	"google.golang.org/grpc/status"
75	"google.golang.org/grpc/tap"
76	testpb "google.golang.org/grpc/test/grpc_testing"
77	"google.golang.org/grpc/testdata"
78)
79
80func init() {
81	channelz.TurnOn()
82}
83
84type s struct{}
85
86var lcFailed uint32
87
88type errorer struct {
89	t *testing.T
90}
91
92func (e errorer) Errorf(format string, args ...interface{}) {
93	atomic.StoreUint32(&lcFailed, 1)
94	e.t.Errorf(format, args...)
95}
96
97func (s) Teardown(t *testing.T) {
98	if atomic.LoadUint32(&lcFailed) == 1 {
99		return
100	}
101	leakcheck.Check(errorer{t: t})
102	if atomic.LoadUint32(&lcFailed) == 1 {
103		t.Log("Leak check disabled for future tests")
104	}
105}
106
107func Test(t *testing.T) {
108	grpctest.RunSubTests(t, s{})
109}
110
111var (
112	// For headers:
113	testMetadata = metadata.MD{
114		"key1":     []string{"value1"},
115		"key2":     []string{"value2"},
116		"key3-bin": []string{"binvalue1", string([]byte{1, 2, 3})},
117	}
118	testMetadata2 = metadata.MD{
119		"key1": []string{"value12"},
120		"key2": []string{"value22"},
121	}
122	// For trailers:
123	testTrailerMetadata = metadata.MD{
124		"tkey1":     []string{"trailerValue1"},
125		"tkey2":     []string{"trailerValue2"},
126		"tkey3-bin": []string{"trailerbinvalue1", string([]byte{3, 2, 1})},
127	}
128	testTrailerMetadata2 = metadata.MD{
129		"tkey1": []string{"trailerValue12"},
130		"tkey2": []string{"trailerValue22"},
131	}
132	// capital "Key" is illegal in HTTP/2.
133	malformedHTTP2Metadata = metadata.MD{
134		"Key": []string{"foo"},
135	}
136	testAppUA     = "myApp1/1.0 myApp2/0.9"
137	failAppUA     = "fail-this-RPC"
138	detailedError = status.ErrorProto(&spb.Status{
139		Code:    int32(codes.DataLoss),
140		Message: "error for testing: " + failAppUA,
141		Details: []*anypb.Any{{
142			TypeUrl: "url",
143			Value:   []byte{6, 0, 0, 6, 1, 3},
144		}},
145	})
146)
147
148var raceMode bool // set by race.go in race mode
149
150type testServer struct {
151	security           string // indicate the authentication protocol used by this server.
152	earlyFail          bool   // whether to error out the execution of a service handler prematurely.
153	setAndSendHeader   bool   // whether to call setHeader and sendHeader.
154	setHeaderOnly      bool   // whether to only call setHeader, not sendHeader.
155	multipleSetTrailer bool   // whether to call setTrailer multiple times.
156	unaryCallSleepTime time.Duration
157}
158
159func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
160	if md, ok := metadata.FromIncomingContext(ctx); ok {
161		// For testing purpose, returns an error if user-agent is failAppUA.
162		// To test that client gets the correct error.
163		if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
164			return nil, detailedError
165		}
166		var str []string
167		for _, entry := range md["user-agent"] {
168			str = append(str, "ua", entry)
169		}
170		grpc.SendHeader(ctx, metadata.Pairs(str...))
171	}
172	return new(testpb.Empty), nil
173}
174
175func newPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
176	if size < 0 {
177		return nil, fmt.Errorf("requested a response with invalid length %d", size)
178	}
179	body := make([]byte, size)
180	switch t {
181	case testpb.PayloadType_COMPRESSABLE:
182	case testpb.PayloadType_UNCOMPRESSABLE:
183		return nil, fmt.Errorf("PayloadType UNCOMPRESSABLE is not supported")
184	default:
185		return nil, fmt.Errorf("unsupported payload type: %d", t)
186	}
187	return &testpb.Payload{
188		Type: t,
189		Body: body,
190	}, nil
191}
192
193func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
194	md, ok := metadata.FromIncomingContext(ctx)
195	if ok {
196		if _, exists := md[":authority"]; !exists {
197			return nil, status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
198		}
199		if s.setAndSendHeader {
200			if err := grpc.SetHeader(ctx, md); err != nil {
201				return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
202			}
203			if err := grpc.SendHeader(ctx, testMetadata2); err != nil {
204				return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testMetadata2, err)
205			}
206		} else if s.setHeaderOnly {
207			if err := grpc.SetHeader(ctx, md); err != nil {
208				return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
209			}
210			if err := grpc.SetHeader(ctx, testMetadata2); err != nil {
211				return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", testMetadata2, err)
212			}
213		} else {
214			if err := grpc.SendHeader(ctx, md); err != nil {
215				return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err)
216			}
217		}
218		if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
219			return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
220		}
221		if s.multipleSetTrailer {
222			if err := grpc.SetTrailer(ctx, testTrailerMetadata2); err != nil {
223				return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata2, err)
224			}
225		}
226	}
227	pr, ok := peer.FromContext(ctx)
228	if !ok {
229		return nil, status.Error(codes.DataLoss, "failed to get peer from ctx")
230	}
231	if pr.Addr == net.Addr(nil) {
232		return nil, status.Error(codes.DataLoss, "failed to get peer address")
233	}
234	if s.security != "" {
235		// Check Auth info
236		var authType, serverName string
237		switch info := pr.AuthInfo.(type) {
238		case credentials.TLSInfo:
239			authType = info.AuthType()
240			serverName = info.State.ServerName
241		default:
242			return nil, status.Error(codes.Unauthenticated, "Unknown AuthInfo type")
243		}
244		if authType != s.security {
245			return nil, status.Errorf(codes.Unauthenticated, "Wrong auth type: got %q, want %q", authType, s.security)
246		}
247		if serverName != "x.test.youtube.com" {
248			return nil, status.Errorf(codes.Unauthenticated, "Unknown server name %q", serverName)
249		}
250	}
251	// Simulate some service delay.
252	time.Sleep(s.unaryCallSleepTime)
253
254	payload, err := newPayload(in.GetResponseType(), in.GetResponseSize())
255	if err != nil {
256		return nil, err
257	}
258
259	return &testpb.SimpleResponse{
260		Payload: payload,
261	}, nil
262}
263
264func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
265	if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
266		if _, exists := md[":authority"]; !exists {
267			return status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
268		}
269		// For testing purpose, returns an error if user-agent is failAppUA.
270		// To test that client gets the correct error.
271		if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
272			return status.Error(codes.DataLoss, "error for testing: "+failAppUA)
273		}
274	}
275	cs := args.GetResponseParameters()
276	for _, c := range cs {
277		if us := c.GetIntervalUs(); us > 0 {
278			time.Sleep(time.Duration(us) * time.Microsecond)
279		}
280
281		payload, err := newPayload(args.GetResponseType(), c.GetSize())
282		if err != nil {
283			return err
284		}
285
286		if err := stream.Send(&testpb.StreamingOutputCallResponse{
287			Payload: payload,
288		}); err != nil {
289			return err
290		}
291	}
292	return nil
293}
294
295func (s *testServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
296	var sum int
297	for {
298		in, err := stream.Recv()
299		if err == io.EOF {
300			return stream.SendAndClose(&testpb.StreamingInputCallResponse{
301				AggregatedPayloadSize: int32(sum),
302			})
303		}
304		if err != nil {
305			return err
306		}
307		p := in.GetPayload().GetBody()
308		sum += len(p)
309		if s.earlyFail {
310			return status.Error(codes.NotFound, "not found")
311		}
312	}
313}
314
315func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
316	md, ok := metadata.FromIncomingContext(stream.Context())
317	if ok {
318		if s.setAndSendHeader {
319			if err := stream.SetHeader(md); err != nil {
320				return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
321			}
322			if err := stream.SendHeader(testMetadata2); err != nil {
323				return status.Errorf(status.Code(err), "%v.SendHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
324			}
325		} else if s.setHeaderOnly {
326			if err := stream.SetHeader(md); err != nil {
327				return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
328			}
329			if err := stream.SetHeader(testMetadata2); err != nil {
330				return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
331			}
332		} else {
333			if err := stream.SendHeader(md); err != nil {
334				return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
335			}
336		}
337		stream.SetTrailer(testTrailerMetadata)
338		if s.multipleSetTrailer {
339			stream.SetTrailer(testTrailerMetadata2)
340		}
341	}
342	for {
343		in, err := stream.Recv()
344		if err == io.EOF {
345			// read done.
346			return nil
347		}
348		if err != nil {
349			// to facilitate testSvrWriteStatusEarlyWrite
350			if status.Code(err) == codes.ResourceExhausted {
351				return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
352			}
353			return err
354		}
355		cs := in.GetResponseParameters()
356		for _, c := range cs {
357			if us := c.GetIntervalUs(); us > 0 {
358				time.Sleep(time.Duration(us) * time.Microsecond)
359			}
360
361			payload, err := newPayload(in.GetResponseType(), c.GetSize())
362			if err != nil {
363				return err
364			}
365
366			if err := stream.Send(&testpb.StreamingOutputCallResponse{
367				Payload: payload,
368			}); err != nil {
369				// to facilitate testSvrWriteStatusEarlyWrite
370				if status.Code(err) == codes.ResourceExhausted {
371					return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
372				}
373				return err
374			}
375		}
376	}
377}
378
379func (s *testServer) HalfDuplexCall(stream testpb.TestService_HalfDuplexCallServer) error {
380	var msgBuf []*testpb.StreamingOutputCallRequest
381	for {
382		in, err := stream.Recv()
383		if err == io.EOF {
384			// read done.
385			break
386		}
387		if err != nil {
388			return err
389		}
390		msgBuf = append(msgBuf, in)
391	}
392	for _, m := range msgBuf {
393		cs := m.GetResponseParameters()
394		for _, c := range cs {
395			if us := c.GetIntervalUs(); us > 0 {
396				time.Sleep(time.Duration(us) * time.Microsecond)
397			}
398
399			payload, err := newPayload(m.GetResponseType(), c.GetSize())
400			if err != nil {
401				return err
402			}
403
404			if err := stream.Send(&testpb.StreamingOutputCallResponse{
405				Payload: payload,
406			}); err != nil {
407				return err
408			}
409		}
410	}
411	return nil
412}
413
414type env struct {
415	name         string
416	network      string // The type of network such as tcp, unix, etc.
417	security     string // The security protocol such as TLS, SSH, etc.
418	httpHandler  bool   // whether to use the http.Handler ServerTransport; requires TLS
419	balancer     string // One of "round_robin", "pick_first", "v1", or "".
420	customDialer func(string, string, time.Duration) (net.Conn, error)
421}
422
423func (e env) runnable() bool {
424	if runtime.GOOS == "windows" && e.network == "unix" {
425		return false
426	}
427	return true
428}
429
430func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) {
431	if e.customDialer != nil {
432		return e.customDialer(e.network, addr, timeout)
433	}
434	return net.DialTimeout(e.network, addr, timeout)
435}
436
437var (
438	tcpClearEnv   = env{name: "tcp-clear-v1-balancer", network: "tcp", balancer: "v1"}
439	tcpTLSEnv     = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls", balancer: "v1"}
440	tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "round_robin"}
441	tcpTLSRREnv   = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "round_robin"}
442	handlerEnv    = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "round_robin"}
443	noBalancerEnv = env{name: "no-balancer", network: "tcp", security: "tls"}
444	allEnv        = []env{tcpClearEnv, tcpTLSEnv, tcpClearRREnv, tcpTLSRREnv, handlerEnv, noBalancerEnv}
445)
446
447var onlyEnv = flag.String("only_env", "", "If non-empty, one of 'tcp-clear', 'tcp-tls', 'unix-clear', 'unix-tls', or 'handler-tls' to only run the tests for that environment. Empty means all.")
448
449func listTestEnv() (envs []env) {
450	if *onlyEnv != "" {
451		for _, e := range allEnv {
452			if e.name == *onlyEnv {
453				if !e.runnable() {
454					panic(fmt.Sprintf("--only_env environment %q does not run on %s", *onlyEnv, runtime.GOOS))
455				}
456				return []env{e}
457			}
458		}
459		panic(fmt.Sprintf("invalid --only_env value %q", *onlyEnv))
460	}
461	for _, e := range allEnv {
462		if e.runnable() {
463			envs = append(envs, e)
464		}
465	}
466	return envs
467}
468
469// test is an end-to-end test. It should be created with the newTest
470// func, modified as needed, and then started with its startServer method.
471// It should be cleaned up with the tearDown method.
472type test struct {
473	t *testing.T
474	e env
475
476	ctx    context.Context // valid for life of test, before tearDown
477	cancel context.CancelFunc
478
479	// Configurable knobs, after newTest returns:
480	testServer              testpb.TestServiceServer // nil means none
481	healthServer            *health.Server           // nil means disabled
482	maxStream               uint32
483	tapHandle               tap.ServerInHandle
484	maxMsgSize              *int
485	maxClientReceiveMsgSize *int
486	maxClientSendMsgSize    *int
487	maxServerReceiveMsgSize *int
488	maxServerSendMsgSize    *int
489	maxClientHeaderListSize *uint32
490	maxServerHeaderListSize *uint32
491	userAgent               string
492	// clientCompression and serverCompression are set to test the deprecated API
493	// WithCompressor and WithDecompressor.
494	clientCompression bool
495	serverCompression bool
496	// clientUseCompression is set to test the new compressor registration API UseCompressor.
497	clientUseCompression bool
498	// clientNopCompression is set to create a compressor whose type is not supported.
499	clientNopCompression        bool
500	unaryClientInt              grpc.UnaryClientInterceptor
501	streamClientInt             grpc.StreamClientInterceptor
502	unaryServerInt              grpc.UnaryServerInterceptor
503	streamServerInt             grpc.StreamServerInterceptor
504	unknownHandler              grpc.StreamHandler
505	sc                          <-chan grpc.ServiceConfig
506	customCodec                 encoding.Codec
507	serverInitialWindowSize     int32
508	serverInitialConnWindowSize int32
509	clientInitialWindowSize     int32
510	clientInitialConnWindowSize int32
511	perRPCCreds                 credentials.PerRPCCredentials
512	customDialOptions           []grpc.DialOption
513	customServerOptions         []grpc.ServerOption
514	resolverScheme              string
515
516	// All test dialing is blocking by default. Set this to true if dial
517	// should be non-blocking.
518	nonBlockingDial bool
519
520	// srv and srvAddr are set once startServer is called.
521	srv     stopper
522	srvAddr string
523
524	// srvs and srvAddrs are set once startServers is called.
525	srvs     []*grpc.Server
526	srvAddrs []string
527
528	cc          *grpc.ClientConn // nil until requested via clientConn
529	restoreLogs func()           // nil unless declareLogNoise is used
530}
531
532type stopper interface {
533	Stop()
534	GracefulStop()
535}
536
537func (te *test) tearDown() {
538	if te.cancel != nil {
539		te.cancel()
540		te.cancel = nil
541	}
542
543	if te.cc != nil {
544		te.cc.Close()
545		te.cc = nil
546	}
547
548	if te.restoreLogs != nil {
549		te.restoreLogs()
550		te.restoreLogs = nil
551	}
552
553	if te.srv != nil {
554		te.srv.Stop()
555	}
556	if len(te.srvs) != 0 {
557		for _, s := range te.srvs {
558			s.Stop()
559		}
560	}
561}
562
563// newTest returns a new test using the provided testing.T and
564// environment.  It is returned with default values. Tests should
565// modify it before calling its startServer and clientConn methods.
566func newTest(t *testing.T, e env) *test {
567	te := &test{
568		t:         t,
569		e:         e,
570		maxStream: math.MaxUint32,
571	}
572	te.ctx, te.cancel = context.WithCancel(context.Background())
573	return te
574}
575
576func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener {
577	te.testServer = ts
578	te.t.Logf("Running test in %s environment...", te.e.name)
579	sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
580	if te.maxMsgSize != nil {
581		sopts = append(sopts, grpc.MaxMsgSize(*te.maxMsgSize))
582	}
583	if te.maxServerReceiveMsgSize != nil {
584		sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize))
585	}
586	if te.maxServerSendMsgSize != nil {
587		sopts = append(sopts, grpc.MaxSendMsgSize(*te.maxServerSendMsgSize))
588	}
589	if te.maxServerHeaderListSize != nil {
590		sopts = append(sopts, grpc.MaxHeaderListSize(*te.maxServerHeaderListSize))
591	}
592	if te.tapHandle != nil {
593		sopts = append(sopts, grpc.InTapHandle(te.tapHandle))
594	}
595	if te.serverCompression {
596		sopts = append(sopts,
597			grpc.RPCCompressor(grpc.NewGZIPCompressor()),
598			grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
599		)
600	}
601	if te.unaryServerInt != nil {
602		sopts = append(sopts, grpc.UnaryInterceptor(te.unaryServerInt))
603	}
604	if te.streamServerInt != nil {
605		sopts = append(sopts, grpc.StreamInterceptor(te.streamServerInt))
606	}
607	if te.unknownHandler != nil {
608		sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
609	}
610	if te.serverInitialWindowSize > 0 {
611		sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
612	}
613	if te.serverInitialConnWindowSize > 0 {
614		sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
615	}
616	la := "localhost:0"
617	switch te.e.network {
618	case "unix":
619		la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano())
620		syscall.Unlink(la)
621	}
622	lis, err := listen(te.e.network, la)
623	if err != nil {
624		te.t.Fatalf("Failed to listen: %v", err)
625	}
626	switch te.e.security {
627	case "tls":
628		creds, err := credentials.NewServerTLSFromFile(testdata.Path("server1.pem"), testdata.Path("server1.key"))
629		if err != nil {
630			te.t.Fatalf("Failed to generate credentials %v", err)
631		}
632		sopts = append(sopts, grpc.Creds(creds))
633	case "clientTimeoutCreds":
634		sopts = append(sopts, grpc.Creds(&clientTimeoutCreds{}))
635	}
636	sopts = append(sopts, te.customServerOptions...)
637	s := grpc.NewServer(sopts...)
638	te.srv = s
639	if te.healthServer != nil {
640		healthgrpc.RegisterHealthServer(s, te.healthServer)
641	}
642	if te.testServer != nil {
643		testpb.RegisterTestServiceServer(s, te.testServer)
644	}
645	addr := la
646	switch te.e.network {
647	case "unix":
648	default:
649		_, port, err := net.SplitHostPort(lis.Addr().String())
650		if err != nil {
651			te.t.Fatalf("Failed to parse listener address: %v", err)
652		}
653		addr = "localhost:" + port
654	}
655
656	te.srvAddr = addr
657
658	if te.e.httpHandler {
659		if te.e.security != "tls" {
660			te.t.Fatalf("unsupported environment settings")
661		}
662		cert, err := tls.LoadX509KeyPair(testdata.Path("server1.pem"), testdata.Path("server1.key"))
663		if err != nil {
664			te.t.Fatal("Error creating TLS certificate: ", err)
665		}
666		hs := &http.Server{
667			Handler: s,
668		}
669		err = http2.ConfigureServer(hs, &http2.Server{
670			MaxConcurrentStreams: te.maxStream,
671		})
672		if err != nil {
673			te.t.Fatal("error starting http2 server: ", err)
674		}
675		hs.TLSConfig.Certificates = []tls.Certificate{cert}
676		tlsListener := tls.NewListener(lis, hs.TLSConfig)
677		whs := &wrapHS{Listener: tlsListener, s: hs, conns: make(map[net.Conn]bool)}
678		te.srv = whs
679		go hs.Serve(whs)
680
681		return lis
682	}
683
684	go s.Serve(lis)
685	return lis
686}
687
688// TODO: delete wrapHS and wrapConn when Go1.6 and Go1.7 support are gone and
689// call s.Close and s.Shutdown instead.
690type wrapHS struct {
691	sync.Mutex
692	net.Listener
693	s     *http.Server
694	conns map[net.Conn]bool
695}
696
697func (w *wrapHS) Accept() (net.Conn, error) {
698	c, err := w.Listener.Accept()
699	if err != nil {
700		return nil, err
701	}
702	w.Lock()
703	if w.conns == nil {
704		w.Unlock()
705		c.Close()
706		return nil, errors.New("connection after listener closed")
707	}
708	w.conns[&wrapConn{Conn: c, hs: w}] = true
709	w.Unlock()
710	return c, nil
711}
712
713func (w *wrapHS) Stop() {
714	w.Listener.Close()
715	w.Lock()
716	conns := w.conns
717	w.conns = nil
718	w.Unlock()
719	for c := range conns {
720		c.Close()
721	}
722}
723
724// Poll for now..
725func (w *wrapHS) GracefulStop() {
726	w.Listener.Close()
727	for {
728		w.Lock()
729		l := len(w.conns)
730		w.Unlock()
731		if l == 0 {
732			return
733		}
734		time.Sleep(50 * time.Millisecond)
735	}
736}
737
738type wrapConn struct {
739	net.Conn
740	hs *wrapHS
741}
742
743func (w *wrapConn) Close() error {
744	w.hs.Lock()
745	delete(w.hs.conns, w.Conn)
746	w.hs.Unlock()
747	return w.Conn.Close()
748}
749
750func (te *test) startServerWithConnControl(ts testpb.TestServiceServer) *listenerWrapper {
751	l := te.listenAndServe(ts, listenWithConnControl)
752	return l.(*listenerWrapper)
753}
754
755// startServer starts a gRPC server listening. Callers should defer a
756// call to te.tearDown to clean up.
757func (te *test) startServer(ts testpb.TestServiceServer) {
758	te.listenAndServe(ts, net.Listen)
759}
760
761type nopCompressor struct {
762	grpc.Compressor
763}
764
765// NewNopCompressor creates a compressor to test the case that type is not supported.
766func NewNopCompressor() grpc.Compressor {
767	return &nopCompressor{grpc.NewGZIPCompressor()}
768}
769
770func (c *nopCompressor) Type() string {
771	return "nop"
772}
773
774type nopDecompressor struct {
775	grpc.Decompressor
776}
777
778// NewNopDecompressor creates a decompressor to test the case that type is not supported.
779func NewNopDecompressor() grpc.Decompressor {
780	return &nopDecompressor{grpc.NewGZIPDecompressor()}
781}
782
783func (d *nopDecompressor) Type() string {
784	return "nop"
785}
786
787func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) {
788	opts = append(opts, grpc.WithDialer(te.e.dialer), grpc.WithUserAgent(te.userAgent))
789
790	if te.sc != nil {
791		opts = append(opts, grpc.WithServiceConfig(te.sc))
792	}
793
794	if te.clientCompression {
795		opts = append(opts,
796			grpc.WithCompressor(grpc.NewGZIPCompressor()),
797			grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
798		)
799	}
800	if te.clientUseCompression {
801		opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
802	}
803	if te.clientNopCompression {
804		opts = append(opts,
805			grpc.WithCompressor(NewNopCompressor()),
806			grpc.WithDecompressor(NewNopDecompressor()),
807		)
808	}
809	if te.unaryClientInt != nil {
810		opts = append(opts, grpc.WithUnaryInterceptor(te.unaryClientInt))
811	}
812	if te.streamClientInt != nil {
813		opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt))
814	}
815	if te.maxMsgSize != nil {
816		opts = append(opts, grpc.WithMaxMsgSize(*te.maxMsgSize))
817	}
818	if te.maxClientReceiveMsgSize != nil {
819		opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize)))
820	}
821	if te.maxClientSendMsgSize != nil {
822		opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(*te.maxClientSendMsgSize)))
823	}
824	if te.maxClientHeaderListSize != nil {
825		opts = append(opts, grpc.WithMaxHeaderListSize(*te.maxClientHeaderListSize))
826	}
827	switch te.e.security {
828	case "tls":
829		creds, err := credentials.NewClientTLSFromFile(testdata.Path("ca.pem"), "x.test.youtube.com")
830		if err != nil {
831			te.t.Fatalf("Failed to load credentials: %v", err)
832		}
833		opts = append(opts, grpc.WithTransportCredentials(creds))
834	case "clientTimeoutCreds":
835		opts = append(opts, grpc.WithTransportCredentials(&clientTimeoutCreds{}))
836	case "empty":
837		// Don't add any transport creds option.
838	default:
839		opts = append(opts, grpc.WithInsecure())
840	}
841	// TODO(bar) switch balancer case "pick_first".
842	var scheme string
843	if te.resolverScheme == "" {
844		scheme = "passthrough:///"
845	} else {
846		scheme = te.resolverScheme + ":///"
847	}
848	switch te.e.balancer {
849	case "v1":
850		opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil)))
851	case "round_robin":
852		opts = append(opts, grpc.WithBalancerName(roundrobin.Name))
853	}
854	if te.clientInitialWindowSize > 0 {
855		opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
856	}
857	if te.clientInitialConnWindowSize > 0 {
858		opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
859	}
860	if te.perRPCCreds != nil {
861		opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds))
862	}
863	if te.customCodec != nil {
864		opts = append(opts, grpc.WithDefaultCallOptions(grpc.ForceCodec(te.customCodec)))
865	}
866	if !te.nonBlockingDial && te.srvAddr != "" {
867		// Only do a blocking dial if server is up.
868		opts = append(opts, grpc.WithBlock())
869	}
870	if te.srvAddr == "" {
871		te.srvAddr = "client.side.only.test"
872	}
873	opts = append(opts, te.customDialOptions...)
874	return opts, scheme
875}
876
877func (te *test) clientConnWithConnControl() (*grpc.ClientConn, *dialerWrapper) {
878	if te.cc != nil {
879		return te.cc, nil
880	}
881	opts, scheme := te.configDial()
882	dw := &dialerWrapper{}
883	// overwrite the dialer before
884	opts = append(opts, grpc.WithDialer(dw.dialer))
885	var err error
886	te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...)
887	if err != nil {
888		te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err)
889	}
890	return te.cc, dw
891}
892
893func (te *test) clientConn(opts ...grpc.DialOption) *grpc.ClientConn {
894	if te.cc != nil {
895		return te.cc
896	}
897	var scheme string
898	opts, scheme = te.configDial(opts...)
899	var err error
900	te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...)
901	if err != nil {
902		te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err)
903	}
904	return te.cc
905}
906
907func (te *test) declareLogNoise(phrases ...string) {
908	te.restoreLogs = declareLogNoise(te.t, phrases...)
909}
910
911func (te *test) withServerTester(fn func(st *serverTester)) {
912	c, err := te.e.dialer(te.srvAddr, 10*time.Second)
913	if err != nil {
914		te.t.Fatal(err)
915	}
916	defer c.Close()
917	if te.e.security == "tls" {
918		c = tls.Client(c, &tls.Config{
919			InsecureSkipVerify: true,
920			NextProtos:         []string{http2.NextProtoTLS},
921		})
922	}
923	st := newServerTesterFromConn(te.t, c)
924	st.greet()
925	fn(st)
926}
927
928type lazyConn struct {
929	net.Conn
930	beLazy int32
931}
932
933func (l *lazyConn) Write(b []byte) (int, error) {
934	if atomic.LoadInt32(&(l.beLazy)) == 1 {
935		time.Sleep(time.Second)
936	}
937	return l.Conn.Write(b)
938}
939
940func (s) TestContextDeadlineNotIgnored(t *testing.T) {
941	e := noBalancerEnv
942	var lc *lazyConn
943	e.customDialer = func(network, addr string, timeout time.Duration) (net.Conn, error) {
944		conn, err := net.DialTimeout(network, addr, timeout)
945		if err != nil {
946			return nil, err
947		}
948		lc = &lazyConn{Conn: conn}
949		return lc, nil
950	}
951
952	te := newTest(t, e)
953	te.startServer(&testServer{security: e.security})
954	defer te.tearDown()
955
956	cc := te.clientConn()
957	tc := testpb.NewTestServiceClient(cc)
958	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
959		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
960	}
961	atomic.StoreInt32(&(lc.beLazy), 1)
962	ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
963	defer cancel()
964	t1 := time.Now()
965	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
966		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err)
967	}
968	if time.Since(t1) > 2*time.Second {
969		t.Fatalf("TestService/EmptyCall(_, _) ran over the deadline")
970	}
971}
972
973func (s) TestTimeoutOnDeadServer(t *testing.T) {
974	for _, e := range listTestEnv() {
975		testTimeoutOnDeadServer(t, e)
976	}
977}
978
979func testTimeoutOnDeadServer(t *testing.T, e env) {
980	te := newTest(t, e)
981	te.userAgent = testAppUA
982	te.declareLogNoise(
983		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
984		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
985		"grpc: addrConn.resetTransport failed to create client transport: connection error",
986	)
987	te.startServer(&testServer{security: e.security})
988	defer te.tearDown()
989
990	cc := te.clientConn()
991	tc := testpb.NewTestServiceClient(cc)
992	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
993		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
994	}
995	te.srv.Stop()
996
997	// Wait for the client to notice the connection is gone.
998	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
999	state := cc.GetState()
1000	for ; state == connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
1001	}
1002	cancel()
1003	if state == connectivity.Ready {
1004		t.Fatalf("Timed out waiting for non-ready state")
1005	}
1006	ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond)
1007	_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true))
1008	cancel()
1009	if e.balancer != "" && status.Code(err) != codes.DeadlineExceeded {
1010		// If e.balancer == nil, the ac will stop reconnecting because the dialer returns non-temp error,
1011		// the error will be an internal error.
1012		t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded)
1013	}
1014	awaitNewConnLogOutput()
1015}
1016
1017func (s) TestServerGracefulStopIdempotent(t *testing.T) {
1018	for _, e := range listTestEnv() {
1019		if e.name == "handler-tls" {
1020			continue
1021		}
1022		testServerGracefulStopIdempotent(t, e)
1023	}
1024}
1025
1026func testServerGracefulStopIdempotent(t *testing.T, e env) {
1027	te := newTest(t, e)
1028	te.userAgent = testAppUA
1029	te.startServer(&testServer{security: e.security})
1030	defer te.tearDown()
1031
1032	for i := 0; i < 3; i++ {
1033		te.srv.GracefulStop()
1034	}
1035}
1036
1037func (s) TestServerGoAway(t *testing.T) {
1038	for _, e := range listTestEnv() {
1039		if e.name == "handler-tls" {
1040			continue
1041		}
1042		testServerGoAway(t, e)
1043	}
1044}
1045
1046func testServerGoAway(t *testing.T, e env) {
1047	te := newTest(t, e)
1048	te.userAgent = testAppUA
1049	te.startServer(&testServer{security: e.security})
1050	defer te.tearDown()
1051
1052	cc := te.clientConn()
1053	tc := testpb.NewTestServiceClient(cc)
1054	// Finish an RPC to make sure the connection is good.
1055	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1056	defer cancel()
1057	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1058		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1059	}
1060	ch := make(chan struct{})
1061	go func() {
1062		te.srv.GracefulStop()
1063		close(ch)
1064	}()
1065	// Loop until the server side GoAway signal is propagated to the client.
1066	for {
1067		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1068		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) != codes.DeadlineExceeded {
1069			cancel()
1070			break
1071		}
1072		cancel()
1073	}
1074	// A new RPC should fail.
1075	ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
1076	defer cancel()
1077	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal {
1078		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
1079	}
1080	<-ch
1081	awaitNewConnLogOutput()
1082}
1083
1084func (s) TestServerGoAwayPendingRPC(t *testing.T) {
1085	for _, e := range listTestEnv() {
1086		if e.name == "handler-tls" {
1087			continue
1088		}
1089		testServerGoAwayPendingRPC(t, e)
1090	}
1091}
1092
1093func testServerGoAwayPendingRPC(t *testing.T, e env) {
1094	te := newTest(t, e)
1095	te.userAgent = testAppUA
1096	te.declareLogNoise(
1097		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1098		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1099		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1100	)
1101	te.startServer(&testServer{security: e.security})
1102	defer te.tearDown()
1103
1104	cc := te.clientConn()
1105	tc := testpb.NewTestServiceClient(cc)
1106	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
1107	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
1108	if err != nil {
1109		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1110	}
1111	// Finish an RPC to make sure the connection is good.
1112	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1113		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1114	}
1115	ch := make(chan struct{})
1116	go func() {
1117		te.srv.GracefulStop()
1118		close(ch)
1119	}()
1120	// Loop until the server side GoAway signal is propagated to the client.
1121	start := time.Now()
1122	errored := false
1123	for time.Since(start) < time.Second {
1124		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1125		_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true))
1126		cancel()
1127		if err != nil {
1128			errored = true
1129			break
1130		}
1131	}
1132	if !errored {
1133		t.Fatalf("GoAway never received by client")
1134	}
1135	respParam := []*testpb.ResponseParameters{{Size: 1}}
1136	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
1137	if err != nil {
1138		t.Fatal(err)
1139	}
1140	req := &testpb.StreamingOutputCallRequest{
1141		ResponseType:       testpb.PayloadType_COMPRESSABLE,
1142		ResponseParameters: respParam,
1143		Payload:            payload,
1144	}
1145	// The existing RPC should be still good to proceed.
1146	if err := stream.Send(req); err != nil {
1147		t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
1148	}
1149	if _, err := stream.Recv(); err != nil {
1150		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
1151	}
1152	// The RPC will run until canceled.
1153	cancel()
1154	<-ch
1155	awaitNewConnLogOutput()
1156}
1157
1158func (s) TestServerMultipleGoAwayPendingRPC(t *testing.T) {
1159	for _, e := range listTestEnv() {
1160		if e.name == "handler-tls" {
1161			continue
1162		}
1163		testServerMultipleGoAwayPendingRPC(t, e)
1164	}
1165}
1166
1167func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
1168	te := newTest(t, e)
1169	te.userAgent = testAppUA
1170	te.declareLogNoise(
1171		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1172		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1173		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1174	)
1175	te.startServer(&testServer{security: e.security})
1176	defer te.tearDown()
1177
1178	cc := te.clientConn()
1179	tc := testpb.NewTestServiceClient(cc)
1180	ctx, cancel := context.WithCancel(context.Background())
1181	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
1182	if err != nil {
1183		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1184	}
1185	// Finish an RPC to make sure the connection is good.
1186	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1187		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1188	}
1189	ch1 := make(chan struct{})
1190	go func() {
1191		te.srv.GracefulStop()
1192		close(ch1)
1193	}()
1194	ch2 := make(chan struct{})
1195	go func() {
1196		te.srv.GracefulStop()
1197		close(ch2)
1198	}()
1199	// Loop until the server side GoAway signal is propagated to the client.
1200	for {
1201		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1202		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1203			cancel()
1204			break
1205		}
1206		cancel()
1207	}
1208	select {
1209	case <-ch1:
1210		t.Fatal("GracefulStop() terminated early")
1211	case <-ch2:
1212		t.Fatal("GracefulStop() terminated early")
1213	default:
1214	}
1215	respParam := []*testpb.ResponseParameters{
1216		{
1217			Size: 1,
1218		},
1219	}
1220	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
1221	if err != nil {
1222		t.Fatal(err)
1223	}
1224	req := &testpb.StreamingOutputCallRequest{
1225		ResponseType:       testpb.PayloadType_COMPRESSABLE,
1226		ResponseParameters: respParam,
1227		Payload:            payload,
1228	}
1229	// The existing RPC should be still good to proceed.
1230	if err := stream.Send(req); err != nil {
1231		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
1232	}
1233	if _, err := stream.Recv(); err != nil {
1234		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
1235	}
1236	if err := stream.CloseSend(); err != nil {
1237		t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
1238	}
1239	<-ch1
1240	<-ch2
1241	cancel()
1242	awaitNewConnLogOutput()
1243}
1244
1245func (s) TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) {
1246	for _, e := range listTestEnv() {
1247		if e.name == "handler-tls" {
1248			continue
1249		}
1250		testConcurrentClientConnCloseAndServerGoAway(t, e)
1251	}
1252}
1253
1254func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
1255	te := newTest(t, e)
1256	te.userAgent = testAppUA
1257	te.declareLogNoise(
1258		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1259		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1260		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1261	)
1262	te.startServer(&testServer{security: e.security})
1263	defer te.tearDown()
1264
1265	cc := te.clientConn()
1266	tc := testpb.NewTestServiceClient(cc)
1267	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1268		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1269	}
1270	ch := make(chan struct{})
1271	// Close ClientConn and Server concurrently.
1272	go func() {
1273		te.srv.GracefulStop()
1274		close(ch)
1275	}()
1276	go func() {
1277		cc.Close()
1278	}()
1279	<-ch
1280}
1281
1282func (s) TestConcurrentServerStopAndGoAway(t *testing.T) {
1283	for _, e := range listTestEnv() {
1284		if e.name == "handler-tls" {
1285			continue
1286		}
1287		testConcurrentServerStopAndGoAway(t, e)
1288	}
1289}
1290
1291func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
1292	te := newTest(t, e)
1293	te.userAgent = testAppUA
1294	te.declareLogNoise(
1295		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1296		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1297		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1298	)
1299	te.startServer(&testServer{security: e.security})
1300	defer te.tearDown()
1301
1302	cc := te.clientConn()
1303	tc := testpb.NewTestServiceClient(cc)
1304	stream, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true))
1305	if err != nil {
1306		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1307	}
1308	// Finish an RPC to make sure the connection is good.
1309	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1310		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
1311	}
1312	ch := make(chan struct{})
1313	go func() {
1314		te.srv.GracefulStop()
1315		close(ch)
1316	}()
1317	// Loop until the server side GoAway signal is propagated to the client.
1318	for {
1319		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
1320		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
1321			cancel()
1322			break
1323		}
1324		cancel()
1325	}
1326	// Stop the server and close all the connections.
1327	te.srv.Stop()
1328	respParam := []*testpb.ResponseParameters{
1329		{
1330			Size: 1,
1331		},
1332	}
1333	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
1334	if err != nil {
1335		t.Fatal(err)
1336	}
1337	req := &testpb.StreamingOutputCallRequest{
1338		ResponseType:       testpb.PayloadType_COMPRESSABLE,
1339		ResponseParameters: respParam,
1340		Payload:            payload,
1341	}
1342	sendStart := time.Now()
1343	for {
1344		if err := stream.Send(req); err == io.EOF {
1345			// stream.Send should eventually send io.EOF
1346			break
1347		} else if err != nil {
1348			// Send should never return a transport-level error.
1349			t.Fatalf("stream.Send(%v) = %v; want <nil or io.EOF>", req, err)
1350		}
1351		if time.Since(sendStart) > 2*time.Second {
1352			t.Fatalf("stream.Send(_) did not return io.EOF after 2s")
1353		}
1354		time.Sleep(time.Millisecond)
1355	}
1356	if _, err := stream.Recv(); err == nil || err == io.EOF {
1357		t.Fatalf("%v.Recv() = _, %v, want _, <non-nil, non-EOF>", stream, err)
1358	}
1359	<-ch
1360	awaitNewConnLogOutput()
1361}
1362
1363func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
1364	for _, e := range listTestEnv() {
1365		if e.name == "handler-tls" {
1366			continue
1367		}
1368		testClientConnCloseAfterGoAwayWithActiveStream(t, e)
1369	}
1370}
1371
1372func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
1373	te := newTest(t, e)
1374	te.startServer(&testServer{security: e.security})
1375	defer te.tearDown()
1376	cc := te.clientConn()
1377	tc := testpb.NewTestServiceClient(cc)
1378
1379	ctx, cancel := context.WithCancel(context.Background())
1380	defer cancel()
1381	if _, err := tc.FullDuplexCall(ctx); err != nil {
1382		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
1383	}
1384	done := make(chan struct{})
1385	go func() {
1386		te.srv.GracefulStop()
1387		close(done)
1388	}()
1389	time.Sleep(50 * time.Millisecond)
1390	cc.Close()
1391	timeout := time.NewTimer(time.Second)
1392	select {
1393	case <-done:
1394	case <-timeout.C:
1395		t.Fatalf("Test timed-out.")
1396	}
1397}
1398
1399func (s) TestFailFast(t *testing.T) {
1400	for _, e := range listTestEnv() {
1401		testFailFast(t, e)
1402	}
1403}
1404
1405func testFailFast(t *testing.T, e env) {
1406	te := newTest(t, e)
1407	te.userAgent = testAppUA
1408	te.declareLogNoise(
1409		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1410		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1411		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1412	)
1413	te.startServer(&testServer{security: e.security})
1414	defer te.tearDown()
1415
1416	cc := te.clientConn()
1417	tc := testpb.NewTestServiceClient(cc)
1418	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1419	defer cancel()
1420	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1421		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1422	}
1423	// Stop the server and tear down all the existing connections.
1424	te.srv.Stop()
1425	// Loop until the server teardown is propagated to the client.
1426	for {
1427		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
1428		_, err := tc.EmptyCall(ctx, &testpb.Empty{})
1429		cancel()
1430		if status.Code(err) == codes.Unavailable {
1431			break
1432		}
1433		t.Logf("%v.EmptyCall(_, _) = _, %v", tc, err)
1434		time.Sleep(10 * time.Millisecond)
1435	}
1436	// The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable.
1437	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable {
1438		t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable)
1439	}
1440	if _, err := tc.StreamingInputCall(context.Background()); status.Code(err) != codes.Unavailable {
1441		t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable)
1442	}
1443
1444	awaitNewConnLogOutput()
1445}
1446
1447func testServiceConfigSetup(t *testing.T, e env) *test {
1448	te := newTest(t, e)
1449	te.userAgent = testAppUA
1450	te.declareLogNoise(
1451		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
1452		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
1453		"grpc: addrConn.resetTransport failed to create client transport: connection error",
1454		"Failed to dial : context canceled; please retry.",
1455	)
1456	return te
1457}
1458
1459func newBool(b bool) (a *bool) {
1460	return &b
1461}
1462
1463func newInt(b int) (a *int) {
1464	return &b
1465}
1466
1467func newDuration(b time.Duration) (a *time.Duration) {
1468	a = new(time.Duration)
1469	*a = b
1470	return
1471}
1472
1473func (s) TestGetMethodConfig(t *testing.T) {
1474	te := testServiceConfigSetup(t, tcpClearRREnv)
1475	defer te.tearDown()
1476	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1477	defer rcleanup()
1478
1479	te.resolverScheme = r.Scheme()
1480	cc := te.clientConn()
1481	addrs := []resolver.Address{{Addr: te.srvAddr}}
1482	r.UpdateState(resolver.State{
1483		Addresses: addrs,
1484		ServiceConfig: `{
1485    "methodConfig": [
1486        {
1487            "name": [
1488                {
1489                    "service": "grpc.testing.TestService",
1490                    "method": "EmptyCall"
1491                }
1492            ],
1493            "waitForReady": true,
1494            "timeout": ".001s"
1495        },
1496        {
1497            "name": [
1498                {
1499                    "service": "grpc.testing.TestService"
1500                }
1501            ],
1502            "waitForReady": false
1503        }
1504    ]
1505}`})
1506
1507	tc := testpb.NewTestServiceClient(cc)
1508
1509	// Make sure service config has been processed by grpc.
1510	for {
1511		if cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil {
1512			break
1513		}
1514		time.Sleep(time.Millisecond)
1515	}
1516
1517	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1518	var err error
1519	if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
1520		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1521	}
1522
1523	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: `{
1524    "methodConfig": [
1525        {
1526            "name": [
1527                {
1528                    "service": "grpc.testing.TestService",
1529                    "method": "UnaryCall"
1530                }
1531            ],
1532            "waitForReady": true,
1533            "timeout": ".001s"
1534        },
1535        {
1536            "name": [
1537                {
1538                    "service": "grpc.testing.TestService"
1539                }
1540            ],
1541            "waitForReady": false
1542        }
1543    ]
1544}`})
1545
1546	// Make sure service config has been processed by grpc.
1547	for {
1548		if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && !*mc.WaitForReady {
1549			break
1550		}
1551		time.Sleep(time.Millisecond)
1552	}
1553	// The following RPCs are expected to become fail-fast.
1554	if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable {
1555		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
1556	}
1557}
1558
1559func (s) TestServiceConfigWaitForReady(t *testing.T) {
1560	te := testServiceConfigSetup(t, tcpClearRREnv)
1561	defer te.tearDown()
1562	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1563	defer rcleanup()
1564
1565	// Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
1566	te.resolverScheme = r.Scheme()
1567	cc := te.clientConn()
1568	addrs := []resolver.Address{{Addr: te.srvAddr}}
1569	r.UpdateState(resolver.State{
1570		Addresses: addrs,
1571		ServiceConfig: `{
1572    "methodConfig": [
1573        {
1574            "name": [
1575                {
1576                    "service": "grpc.testing.TestService",
1577                    "method": "EmptyCall"
1578                },
1579                {
1580                    "service": "grpc.testing.TestService",
1581                    "method": "FullDuplexCall"
1582                }
1583            ],
1584            "waitForReady": false,
1585            "timeout": ".001s"
1586        }
1587    ]
1588}`})
1589
1590	tc := testpb.NewTestServiceClient(cc)
1591
1592	// Make sure service config has been processed by grpc.
1593	for {
1594		if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
1595			break
1596		}
1597		time.Sleep(time.Millisecond)
1598	}
1599
1600	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1601	var err error
1602	if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1603		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1604	}
1605	if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1606		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1607	}
1608
1609	// Generate a service config update.
1610	// Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
1611	r.UpdateState(resolver.State{
1612		Addresses: addrs,
1613		ServiceConfig: `{
1614    "methodConfig": [
1615        {
1616            "name": [
1617                {
1618                    "service": "grpc.testing.TestService",
1619                    "method": "EmptyCall"
1620                },
1621                {
1622                    "service": "grpc.testing.TestService",
1623                    "method": "FullDuplexCall"
1624                }
1625            ],
1626            "waitForReady": true,
1627            "timeout": ".001s"
1628        }
1629    ]
1630}`})
1631
1632	// Wait for the new service config to take effect.
1633	for {
1634		if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && *mc.WaitForReady {
1635			break
1636		}
1637		time.Sleep(time.Millisecond)
1638	}
1639	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
1640	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
1641		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1642	}
1643	if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded {
1644		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1645	}
1646}
1647
1648func (s) TestServiceConfigTimeout(t *testing.T) {
1649	te := testServiceConfigSetup(t, tcpClearRREnv)
1650	defer te.tearDown()
1651	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1652	defer rcleanup()
1653
1654	// Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
1655	te.resolverScheme = r.Scheme()
1656	cc := te.clientConn()
1657	addrs := []resolver.Address{{Addr: te.srvAddr}}
1658	r.UpdateState(resolver.State{
1659		Addresses: addrs,
1660		ServiceConfig: `{
1661    "methodConfig": [
1662        {
1663            "name": [
1664                {
1665                    "service": "grpc.testing.TestService",
1666                    "method": "EmptyCall"
1667                },
1668                {
1669                    "service": "grpc.testing.TestService",
1670                    "method": "FullDuplexCall"
1671                }
1672            ],
1673            "waitForReady": true,
1674            "timeout": "3600s"
1675        }
1676    ]
1677}`})
1678
1679	tc := testpb.NewTestServiceClient(cc)
1680
1681	// Make sure service config has been processed by grpc.
1682	for {
1683		if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
1684			break
1685		}
1686		time.Sleep(time.Millisecond)
1687	}
1688
1689	// The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
1690	var err error
1691	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
1692	if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1693		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1694	}
1695	cancel()
1696
1697	ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond)
1698	if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1699		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1700	}
1701	cancel()
1702
1703	// Generate a service config update.
1704	// Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
1705	r.UpdateState(resolver.State{
1706		Addresses: addrs,
1707		ServiceConfig: `{
1708    "methodConfig": [
1709        {
1710            "name": [
1711                {
1712                    "service": "grpc.testing.TestService",
1713                    "method": "EmptyCall"
1714                },
1715                {
1716                    "service": "grpc.testing.TestService",
1717                    "method": "FullDuplexCall"
1718                }
1719            ],
1720            "waitForReady": true,
1721            "timeout": ".000000001s"
1722        }
1723    ]
1724}`})
1725
1726	// Wait for the new service config to take effect.
1727	for {
1728		if mc := cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall"); mc.Timeout != nil && *mc.Timeout == time.Nanosecond {
1729			break
1730		}
1731		time.Sleep(time.Millisecond)
1732	}
1733
1734	ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
1735	if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1736		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1737	}
1738	cancel()
1739
1740	ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
1741	if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1742		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1743	}
1744	cancel()
1745}
1746
1747func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
1748	e := tcpClearRREnv
1749	r, rcleanup := manual.GenerateAndRegisterManualResolver()
1750	defer rcleanup()
1751
1752	// Setting up values and objects shared across all test cases.
1753	const smallSize = 1
1754	const largeSize = 1024
1755	const extraLargeSize = 2048
1756
1757	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1758	if err != nil {
1759		t.Fatal(err)
1760	}
1761	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
1762	if err != nil {
1763		t.Fatal(err)
1764	}
1765	extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
1766	if err != nil {
1767		t.Fatal(err)
1768	}
1769
1770	scjs := `{
1771    "methodConfig": [
1772        {
1773            "name": [
1774                {
1775                    "service": "grpc.testing.TestService",
1776                    "method": "UnaryCall"
1777                },
1778                {
1779                    "service": "grpc.testing.TestService",
1780                    "method": "FullDuplexCall"
1781                }
1782            ],
1783            "maxRequestMessageBytes": 2048,
1784            "maxResponseMessageBytes": 2048
1785        }
1786    ]
1787}`
1788
1789	// Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1790	te1 := testServiceConfigSetup(t, e)
1791	defer te1.tearDown()
1792
1793	te1.resolverScheme = r.Scheme()
1794	te1.nonBlockingDial = true
1795	te1.startServer(&testServer{security: e.security})
1796	cc1 := te1.clientConn()
1797
1798	addrs := []resolver.Address{{Addr: te1.srvAddr}}
1799	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: scjs})
1800	tc := testpb.NewTestServiceClient(cc1)
1801
1802	req := &testpb.SimpleRequest{
1803		ResponseType: testpb.PayloadType_COMPRESSABLE,
1804		ResponseSize: int32(extraLargeSize),
1805		Payload:      smallPayload,
1806	}
1807
1808	for {
1809		if cc1.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1810			break
1811		}
1812		time.Sleep(time.Millisecond)
1813	}
1814
1815	// Test for unary RPC recv.
1816	if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted {
1817		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1818	}
1819
1820	// Test for unary RPC send.
1821	req.Payload = extraLargePayload
1822	req.ResponseSize = int32(smallSize)
1823	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
1824		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1825	}
1826
1827	// Test for streaming RPC recv.
1828	respParam := []*testpb.ResponseParameters{
1829		{
1830			Size: int32(extraLargeSize),
1831		},
1832	}
1833	sreq := &testpb.StreamingOutputCallRequest{
1834		ResponseType:       testpb.PayloadType_COMPRESSABLE,
1835		ResponseParameters: respParam,
1836		Payload:            smallPayload,
1837	}
1838	stream, err := tc.FullDuplexCall(te1.ctx)
1839	if err != nil {
1840		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1841	}
1842	if err = stream.Send(sreq); err != nil {
1843		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1844	}
1845	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1846		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1847	}
1848
1849	// Test for streaming RPC send.
1850	respParam[0].Size = int32(smallSize)
1851	sreq.Payload = extraLargePayload
1852	stream, err = tc.FullDuplexCall(te1.ctx)
1853	if err != nil {
1854		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1855	}
1856	if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
1857		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1858	}
1859
1860	// Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1861	te2 := testServiceConfigSetup(t, e)
1862	te2.resolverScheme = r.Scheme()
1863	te2.nonBlockingDial = true
1864	te2.maxClientReceiveMsgSize = newInt(1024)
1865	te2.maxClientSendMsgSize = newInt(1024)
1866
1867	te2.startServer(&testServer{security: e.security})
1868	defer te2.tearDown()
1869	cc2 := te2.clientConn()
1870	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te2.srvAddr}}, ServiceConfig: scjs})
1871	tc = testpb.NewTestServiceClient(cc2)
1872
1873	for {
1874		if cc2.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1875			break
1876		}
1877		time.Sleep(time.Millisecond)
1878	}
1879
1880	// Test for unary RPC recv.
1881	req.Payload = smallPayload
1882	req.ResponseSize = int32(largeSize)
1883
1884	if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted {
1885		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1886	}
1887
1888	// Test for unary RPC send.
1889	req.Payload = largePayload
1890	req.ResponseSize = int32(smallSize)
1891	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
1892		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1893	}
1894
1895	// Test for streaming RPC recv.
1896	stream, err = tc.FullDuplexCall(te2.ctx)
1897	respParam[0].Size = int32(largeSize)
1898	sreq.Payload = smallPayload
1899	if err != nil {
1900		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1901	}
1902	if err = stream.Send(sreq); err != nil {
1903		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1904	}
1905	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1906		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1907	}
1908
1909	// Test for streaming RPC send.
1910	respParam[0].Size = int32(smallSize)
1911	sreq.Payload = largePayload
1912	stream, err = tc.FullDuplexCall(te2.ctx)
1913	if err != nil {
1914		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1915	}
1916	if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
1917		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1918	}
1919
1920	// Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
1921	te3 := testServiceConfigSetup(t, e)
1922	te3.resolverScheme = r.Scheme()
1923	te3.nonBlockingDial = true
1924	te3.maxClientReceiveMsgSize = newInt(4096)
1925	te3.maxClientSendMsgSize = newInt(4096)
1926
1927	te3.startServer(&testServer{security: e.security})
1928	defer te3.tearDown()
1929
1930	cc3 := te3.clientConn()
1931	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te3.srvAddr}}, ServiceConfig: scjs})
1932	tc = testpb.NewTestServiceClient(cc3)
1933
1934	for {
1935		if cc3.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1936			break
1937		}
1938		time.Sleep(time.Millisecond)
1939	}
1940
1941	// Test for unary RPC recv.
1942	req.Payload = smallPayload
1943	req.ResponseSize = int32(largeSize)
1944
1945	if _, err = tc.UnaryCall(context.Background(), req, grpc.WaitForReady(true)); err != nil {
1946		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
1947	}
1948
1949	req.ResponseSize = int32(extraLargeSize)
1950	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
1951		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1952	}
1953
1954	// Test for unary RPC send.
1955	req.Payload = largePayload
1956	req.ResponseSize = int32(smallSize)
1957	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
1958		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
1959	}
1960
1961	req.Payload = extraLargePayload
1962	if _, err = tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
1963		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1964	}
1965
1966	// Test for streaming RPC recv.
1967	stream, err = tc.FullDuplexCall(te3.ctx)
1968	if err != nil {
1969		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1970	}
1971	respParam[0].Size = int32(largeSize)
1972	sreq.Payload = smallPayload
1973
1974	if err = stream.Send(sreq); err != nil {
1975		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1976	}
1977	if _, err = stream.Recv(); err != nil {
1978		t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
1979	}
1980
1981	respParam[0].Size = int32(extraLargeSize)
1982
1983	if err = stream.Send(sreq); err != nil {
1984		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1985	}
1986	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1987		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1988	}
1989
1990	// Test for streaming RPC send.
1991	respParam[0].Size = int32(smallSize)
1992	sreq.Payload = largePayload
1993	stream, err = tc.FullDuplexCall(te3.ctx)
1994	if err != nil {
1995		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1996	}
1997	if err := stream.Send(sreq); err != nil {
1998		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1999	}
2000	sreq.Payload = extraLargePayload
2001	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
2002		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
2003	}
2004}
2005
2006// Reading from a streaming RPC may fail with context canceled if timeout was
2007// set by service config (https://github.com/grpc/grpc-go/issues/1818). This
2008// test makes sure read from streaming RPC doesn't fail in this case.
2009func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
2010	te := testServiceConfigSetup(t, tcpClearRREnv)
2011	te.startServer(&testServer{security: tcpClearRREnv.security})
2012	defer te.tearDown()
2013	r, rcleanup := manual.GenerateAndRegisterManualResolver()
2014	defer rcleanup()
2015
2016	te.resolverScheme = r.Scheme()
2017	te.nonBlockingDial = true
2018	cc := te.clientConn()
2019	tc := testpb.NewTestServiceClient(cc)
2020
2021	r.UpdateState(resolver.State{
2022		Addresses: []resolver.Address{{Addr: te.srvAddr}},
2023		ServiceConfig: `{
2024	    "methodConfig": [
2025	        {
2026	            "name": [
2027	                {
2028	                    "service": "grpc.testing.TestService",
2029	                    "method": "FullDuplexCall"
2030	                }
2031	            ],
2032	            "waitForReady": true,
2033	            "timeout": "10s"
2034	        }
2035	    ]
2036	}`})
2037	// Make sure service config has been processed by grpc.
2038	for {
2039		if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
2040			break
2041		}
2042		time.Sleep(time.Millisecond)
2043	}
2044
2045	ctx, cancel := context.WithCancel(context.Background())
2046	defer cancel()
2047	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
2048	if err != nil {
2049		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
2050	}
2051
2052	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 0)
2053	if err != nil {
2054		t.Fatalf("failed to newPayload: %v", err)
2055	}
2056	req := &testpb.StreamingOutputCallRequest{
2057		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2058		ResponseParameters: []*testpb.ResponseParameters{{Size: 0}},
2059		Payload:            payload,
2060	}
2061	if err := stream.Send(req); err != nil {
2062		t.Fatalf("stream.Send(%v) = %v, want <nil>", req, err)
2063	}
2064	stream.CloseSend()
2065	time.Sleep(time.Second)
2066	// Sleep 1 second before recv to make sure the final status is received
2067	// before the recv.
2068	if _, err := stream.Recv(); err != nil {
2069		t.Fatalf("stream.Recv = _, %v, want _, <nil>", err)
2070	}
2071	// Keep reading to drain the stream.
2072	for {
2073		if _, err := stream.Recv(); err != nil {
2074			break
2075		}
2076	}
2077}
2078
2079func (s) TestMaxMsgSizeClientDefault(t *testing.T) {
2080	for _, e := range listTestEnv() {
2081		testMaxMsgSizeClientDefault(t, e)
2082	}
2083}
2084
2085func testMaxMsgSizeClientDefault(t *testing.T, e env) {
2086	te := newTest(t, e)
2087	te.userAgent = testAppUA
2088	te.declareLogNoise(
2089		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2090		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2091		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2092		"Failed to dial : context canceled; please retry.",
2093	)
2094	te.startServer(&testServer{security: e.security})
2095
2096	defer te.tearDown()
2097	tc := testpb.NewTestServiceClient(te.clientConn())
2098
2099	const smallSize = 1
2100	const largeSize = 4 * 1024 * 1024
2101	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2102	if err != nil {
2103		t.Fatal(err)
2104	}
2105	req := &testpb.SimpleRequest{
2106		ResponseType: testpb.PayloadType_COMPRESSABLE,
2107		ResponseSize: int32(largeSize),
2108		Payload:      smallPayload,
2109	}
2110	// Test for unary RPC recv.
2111	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2112		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2113	}
2114
2115	respParam := []*testpb.ResponseParameters{
2116		{
2117			Size: int32(largeSize),
2118		},
2119	}
2120	sreq := &testpb.StreamingOutputCallRequest{
2121		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2122		ResponseParameters: respParam,
2123		Payload:            smallPayload,
2124	}
2125
2126	// Test for streaming RPC recv.
2127	stream, err := tc.FullDuplexCall(te.ctx)
2128	if err != nil {
2129		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2130	}
2131	if err := stream.Send(sreq); err != nil {
2132		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2133	}
2134	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2135		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2136	}
2137}
2138
2139func (s) TestMaxMsgSizeClientAPI(t *testing.T) {
2140	for _, e := range listTestEnv() {
2141		testMaxMsgSizeClientAPI(t, e)
2142	}
2143}
2144
2145func testMaxMsgSizeClientAPI(t *testing.T, e env) {
2146	te := newTest(t, e)
2147	te.userAgent = testAppUA
2148	// To avoid error on server side.
2149	te.maxServerSendMsgSize = newInt(5 * 1024 * 1024)
2150	te.maxClientReceiveMsgSize = newInt(1024)
2151	te.maxClientSendMsgSize = newInt(1024)
2152	te.declareLogNoise(
2153		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2154		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2155		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2156		"Failed to dial : context canceled; please retry.",
2157	)
2158	te.startServer(&testServer{security: e.security})
2159
2160	defer te.tearDown()
2161	tc := testpb.NewTestServiceClient(te.clientConn())
2162
2163	const smallSize = 1
2164	const largeSize = 1024
2165	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2166	if err != nil {
2167		t.Fatal(err)
2168	}
2169
2170	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2171	if err != nil {
2172		t.Fatal(err)
2173	}
2174	req := &testpb.SimpleRequest{
2175		ResponseType: testpb.PayloadType_COMPRESSABLE,
2176		ResponseSize: int32(largeSize),
2177		Payload:      smallPayload,
2178	}
2179	// Test for unary RPC recv.
2180	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2181		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2182	}
2183
2184	// Test for unary RPC send.
2185	req.Payload = largePayload
2186	req.ResponseSize = int32(smallSize)
2187	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2188		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2189	}
2190
2191	respParam := []*testpb.ResponseParameters{
2192		{
2193			Size: int32(largeSize),
2194		},
2195	}
2196	sreq := &testpb.StreamingOutputCallRequest{
2197		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2198		ResponseParameters: respParam,
2199		Payload:            smallPayload,
2200	}
2201
2202	// Test for streaming RPC recv.
2203	stream, err := tc.FullDuplexCall(te.ctx)
2204	if err != nil {
2205		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2206	}
2207	if err := stream.Send(sreq); err != nil {
2208		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2209	}
2210	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2211		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2212	}
2213
2214	// Test for streaming RPC send.
2215	respParam[0].Size = int32(smallSize)
2216	sreq.Payload = largePayload
2217	stream, err = tc.FullDuplexCall(te.ctx)
2218	if err != nil {
2219		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2220	}
2221	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
2222		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
2223	}
2224}
2225
2226func (s) TestMaxMsgSizeServerAPI(t *testing.T) {
2227	for _, e := range listTestEnv() {
2228		testMaxMsgSizeServerAPI(t, e)
2229	}
2230}
2231
2232func testMaxMsgSizeServerAPI(t *testing.T, e env) {
2233	te := newTest(t, e)
2234	te.userAgent = testAppUA
2235	te.maxServerReceiveMsgSize = newInt(1024)
2236	te.maxServerSendMsgSize = newInt(1024)
2237	te.declareLogNoise(
2238		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2239		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2240		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2241		"Failed to dial : context canceled; please retry.",
2242	)
2243	te.startServer(&testServer{security: e.security})
2244
2245	defer te.tearDown()
2246	tc := testpb.NewTestServiceClient(te.clientConn())
2247
2248	const smallSize = 1
2249	const largeSize = 1024
2250	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2251	if err != nil {
2252		t.Fatal(err)
2253	}
2254
2255	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2256	if err != nil {
2257		t.Fatal(err)
2258	}
2259	req := &testpb.SimpleRequest{
2260		ResponseType: testpb.PayloadType_COMPRESSABLE,
2261		ResponseSize: int32(largeSize),
2262		Payload:      smallPayload,
2263	}
2264	// Test for unary RPC send.
2265	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2266		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2267	}
2268
2269	// Test for unary RPC recv.
2270	req.Payload = largePayload
2271	req.ResponseSize = int32(smallSize)
2272	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2273		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2274	}
2275
2276	respParam := []*testpb.ResponseParameters{
2277		{
2278			Size: int32(largeSize),
2279		},
2280	}
2281	sreq := &testpb.StreamingOutputCallRequest{
2282		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2283		ResponseParameters: respParam,
2284		Payload:            smallPayload,
2285	}
2286
2287	// Test for streaming RPC send.
2288	stream, err := tc.FullDuplexCall(te.ctx)
2289	if err != nil {
2290		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2291	}
2292	if err := stream.Send(sreq); err != nil {
2293		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2294	}
2295	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2296		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2297	}
2298
2299	// Test for streaming RPC recv.
2300	respParam[0].Size = int32(smallSize)
2301	sreq.Payload = largePayload
2302	stream, err = tc.FullDuplexCall(te.ctx)
2303	if err != nil {
2304		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2305	}
2306	if err := stream.Send(sreq); err != nil {
2307		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2308	}
2309	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2310		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2311	}
2312}
2313
2314func (s) TestTap(t *testing.T) {
2315	for _, e := range listTestEnv() {
2316		if e.name == "handler-tls" {
2317			continue
2318		}
2319		testTap(t, e)
2320	}
2321}
2322
2323type myTap struct {
2324	cnt int
2325}
2326
2327func (t *myTap) handle(ctx context.Context, info *tap.Info) (context.Context, error) {
2328	if info != nil {
2329		if info.FullMethodName == "/grpc.testing.TestService/EmptyCall" {
2330			t.cnt++
2331		} else if info.FullMethodName == "/grpc.testing.TestService/UnaryCall" {
2332			return nil, fmt.Errorf("tap error")
2333		}
2334	}
2335	return ctx, nil
2336}
2337
2338func testTap(t *testing.T, e env) {
2339	te := newTest(t, e)
2340	te.userAgent = testAppUA
2341	ttap := &myTap{}
2342	te.tapHandle = ttap.handle
2343	te.declareLogNoise(
2344		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
2345		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
2346		"grpc: addrConn.resetTransport failed to create client transport: connection error",
2347	)
2348	te.startServer(&testServer{security: e.security})
2349	defer te.tearDown()
2350
2351	cc := te.clientConn()
2352	tc := testpb.NewTestServiceClient(cc)
2353	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
2354		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2355	}
2356	if ttap.cnt != 1 {
2357		t.Fatalf("Get the count in ttap %d, want 1", ttap.cnt)
2358	}
2359
2360	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 31)
2361	if err != nil {
2362		t.Fatal(err)
2363	}
2364
2365	req := &testpb.SimpleRequest{
2366		ResponseType: testpb.PayloadType_COMPRESSABLE,
2367		ResponseSize: 45,
2368		Payload:      payload,
2369	}
2370	if _, err := tc.UnaryCall(context.Background(), req); status.Code(err) != codes.Unavailable {
2371		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
2372	}
2373}
2374
2375func healthCheck(d time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) {
2376	ctx, cancel := context.WithTimeout(context.Background(), d)
2377	defer cancel()
2378	hc := healthgrpc.NewHealthClient(cc)
2379	req := &healthpb.HealthCheckRequest{
2380		Service: serviceName,
2381	}
2382	return hc.Check(ctx, req)
2383}
2384
2385func (s) TestHealthCheckOnSuccess(t *testing.T) {
2386	for _, e := range listTestEnv() {
2387		testHealthCheckOnSuccess(t, e)
2388	}
2389}
2390
2391func testHealthCheckOnSuccess(t *testing.T, e env) {
2392	te := newTest(t, e)
2393	hs := health.NewServer()
2394	hs.SetServingStatus("grpc.health.v1.Health", 1)
2395	te.healthServer = hs
2396	te.startServer(&testServer{security: e.security})
2397	defer te.tearDown()
2398
2399	cc := te.clientConn()
2400	if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); err != nil {
2401		t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2402	}
2403}
2404
2405func (s) TestHealthCheckOnFailure(t *testing.T) {
2406	for _, e := range listTestEnv() {
2407		testHealthCheckOnFailure(t, e)
2408	}
2409}
2410
2411func testHealthCheckOnFailure(t *testing.T, e env) {
2412	te := newTest(t, e)
2413	te.declareLogNoise(
2414		"Failed to dial ",
2415		"grpc: the client connection is closing; please retry",
2416	)
2417	hs := health.NewServer()
2418	hs.SetServingStatus("grpc.health.v1.HealthCheck", 1)
2419	te.healthServer = hs
2420	te.startServer(&testServer{security: e.security})
2421	defer te.tearDown()
2422
2423	cc := te.clientConn()
2424	wantErr := status.Error(codes.DeadlineExceeded, "context deadline exceeded")
2425	if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1.Health"); !reflect.DeepEqual(err, wantErr) {
2426		t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.DeadlineExceeded)
2427	}
2428	awaitNewConnLogOutput()
2429}
2430
2431func (s) TestHealthCheckOff(t *testing.T) {
2432	for _, e := range listTestEnv() {
2433		// TODO(bradfitz): Temporarily skip this env due to #619.
2434		if e.name == "handler-tls" {
2435			continue
2436		}
2437		testHealthCheckOff(t, e)
2438	}
2439}
2440
2441func testHealthCheckOff(t *testing.T, e env) {
2442	te := newTest(t, e)
2443	te.startServer(&testServer{security: e.security})
2444	defer te.tearDown()
2445	want := status.Error(codes.Unimplemented, "unknown service grpc.health.v1.Health")
2446	if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !reflect.DeepEqual(err, want) {
2447		t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want)
2448	}
2449}
2450
2451func (s) TestHealthWatchMultipleClients(t *testing.T) {
2452	for _, e := range listTestEnv() {
2453		testHealthWatchMultipleClients(t, e)
2454	}
2455}
2456
2457func testHealthWatchMultipleClients(t *testing.T, e env) {
2458	const service = "grpc.health.v1.Health1"
2459
2460	hs := health.NewServer()
2461
2462	te := newTest(t, e)
2463	te.healthServer = hs
2464	te.startServer(&testServer{security: e.security})
2465	defer te.tearDown()
2466
2467	cc := te.clientConn()
2468	hc := healthgrpc.NewHealthClient(cc)
2469
2470	ctx, cancel := context.WithCancel(context.Background())
2471	defer cancel()
2472
2473	req := &healthpb.HealthCheckRequest{
2474		Service: service,
2475	}
2476
2477	stream1, err := hc.Watch(ctx, req)
2478	if err != nil {
2479		t.Fatalf("error: %v", err)
2480	}
2481
2482	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
2483
2484	stream2, err := hc.Watch(ctx, req)
2485	if err != nil {
2486		t.Fatalf("error: %v", err)
2487	}
2488
2489	healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
2490
2491	hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
2492
2493	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING)
2494	healthWatchChecker(t, stream2, healthpb.HealthCheckResponse_NOT_SERVING)
2495}
2496
2497func (s) TestHealthWatchSameStatus(t *testing.T) {
2498	for _, e := range listTestEnv() {
2499		testHealthWatchSameStatus(t, e)
2500	}
2501}
2502
2503func testHealthWatchSameStatus(t *testing.T, e env) {
2504	const service = "grpc.health.v1.Health1"
2505
2506	hs := health.NewServer()
2507
2508	te := newTest(t, e)
2509	te.healthServer = hs
2510	te.startServer(&testServer{security: e.security})
2511	defer te.tearDown()
2512
2513	cc := te.clientConn()
2514	hc := healthgrpc.NewHealthClient(cc)
2515
2516	ctx, cancel := context.WithCancel(context.Background())
2517	defer cancel()
2518
2519	req := &healthpb.HealthCheckRequest{
2520		Service: service,
2521	}
2522
2523	stream1, err := hc.Watch(ctx, req)
2524	if err != nil {
2525		t.Fatalf("error: %v", err)
2526	}
2527
2528	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
2529
2530	hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
2531
2532	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_SERVING)
2533
2534	hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
2535	hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
2536
2537	healthWatchChecker(t, stream1, healthpb.HealthCheckResponse_NOT_SERVING)
2538}
2539
2540func (s) TestHealthWatchServiceStatusSetBeforeStartingServer(t *testing.T) {
2541	for _, e := range listTestEnv() {
2542		testHealthWatchSetServiceStatusBeforeStartingServer(t, e)
2543	}
2544}
2545
2546func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) {
2547	const service = "grpc.health.v1.Health1"
2548
2549	hs := health.NewServer()
2550
2551	te := newTest(t, e)
2552	te.healthServer = hs
2553
2554	hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
2555
2556	te.startServer(&testServer{security: e.security})
2557	defer te.tearDown()
2558
2559	cc := te.clientConn()
2560	hc := healthgrpc.NewHealthClient(cc)
2561
2562	ctx, cancel := context.WithCancel(context.Background())
2563	defer cancel()
2564
2565	req := &healthpb.HealthCheckRequest{
2566		Service: service,
2567	}
2568
2569	stream, err := hc.Watch(ctx, req)
2570	if err != nil {
2571		t.Fatalf("error: %v", err)
2572	}
2573
2574	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2575}
2576
2577func (s) TestHealthWatchDefaultStatusChange(t *testing.T) {
2578	for _, e := range listTestEnv() {
2579		testHealthWatchDefaultStatusChange(t, e)
2580	}
2581}
2582
2583func testHealthWatchDefaultStatusChange(t *testing.T, e env) {
2584	const service = "grpc.health.v1.Health1"
2585
2586	hs := health.NewServer()
2587
2588	te := newTest(t, e)
2589	te.healthServer = hs
2590	te.startServer(&testServer{security: e.security})
2591	defer te.tearDown()
2592
2593	cc := te.clientConn()
2594	hc := healthgrpc.NewHealthClient(cc)
2595
2596	ctx, cancel := context.WithCancel(context.Background())
2597	defer cancel()
2598
2599	req := &healthpb.HealthCheckRequest{
2600		Service: service,
2601	}
2602
2603	stream, err := hc.Watch(ctx, req)
2604	if err != nil {
2605		t.Fatalf("error: %v", err)
2606	}
2607
2608	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVICE_UNKNOWN)
2609
2610	hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
2611
2612	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2613}
2614
2615func (s) TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) {
2616	for _, e := range listTestEnv() {
2617		testHealthWatchSetServiceStatusBeforeClientCallsWatch(t, e)
2618	}
2619}
2620
2621func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) {
2622	const service = "grpc.health.v1.Health1"
2623
2624	hs := health.NewServer()
2625
2626	te := newTest(t, e)
2627	te.healthServer = hs
2628	te.startServer(&testServer{security: e.security})
2629	defer te.tearDown()
2630
2631	cc := te.clientConn()
2632	hc := healthgrpc.NewHealthClient(cc)
2633
2634	ctx, cancel := context.WithCancel(context.Background())
2635	defer cancel()
2636
2637	req := &healthpb.HealthCheckRequest{
2638		Service: service,
2639	}
2640
2641	hs.SetServingStatus(service, healthpb.HealthCheckResponse_SERVING)
2642
2643	stream, err := hc.Watch(ctx, req)
2644	if err != nil {
2645		t.Fatalf("error: %v", err)
2646	}
2647
2648	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2649}
2650
2651func (s) TestHealthWatchOverallServerHealthChange(t *testing.T) {
2652	for _, e := range listTestEnv() {
2653		testHealthWatchOverallServerHealthChange(t, e)
2654	}
2655}
2656
2657func testHealthWatchOverallServerHealthChange(t *testing.T, e env) {
2658	const service = ""
2659
2660	hs := health.NewServer()
2661
2662	te := newTest(t, e)
2663	te.healthServer = hs
2664	te.startServer(&testServer{security: e.security})
2665	defer te.tearDown()
2666
2667	cc := te.clientConn()
2668	hc := healthgrpc.NewHealthClient(cc)
2669
2670	ctx, cancel := context.WithCancel(context.Background())
2671	defer cancel()
2672
2673	req := &healthpb.HealthCheckRequest{
2674		Service: service,
2675	}
2676
2677	stream, err := hc.Watch(ctx, req)
2678	if err != nil {
2679		t.Fatalf("error: %v", err)
2680	}
2681
2682	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_SERVING)
2683
2684	hs.SetServingStatus(service, healthpb.HealthCheckResponse_NOT_SERVING)
2685
2686	healthWatchChecker(t, stream, healthpb.HealthCheckResponse_NOT_SERVING)
2687}
2688
2689func healthWatchChecker(t *testing.T, stream healthgrpc.Health_WatchClient, expectedServingStatus healthpb.HealthCheckResponse_ServingStatus) {
2690	response, err := stream.Recv()
2691	if err != nil {
2692		t.Fatalf("error on %v.Recv(): %v", stream, err)
2693	}
2694	if response.Status != expectedServingStatus {
2695		t.Fatalf("response.Status is %v (%v expected)", response.Status, expectedServingStatus)
2696	}
2697}
2698
2699func (s) TestUnknownHandler(t *testing.T) {
2700	// An example unknownHandler that returns a different code and a different method, making sure that we do not
2701	// expose what methods are implemented to a client that is not authenticated.
2702	unknownHandler := func(srv interface{}, stream grpc.ServerStream) error {
2703		return status.Error(codes.Unauthenticated, "user unauthenticated")
2704	}
2705	for _, e := range listTestEnv() {
2706		// TODO(bradfitz): Temporarily skip this env due to #619.
2707		if e.name == "handler-tls" {
2708			continue
2709		}
2710		testUnknownHandler(t, e, unknownHandler)
2711	}
2712}
2713
2714func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) {
2715	te := newTest(t, e)
2716	te.unknownHandler = unknownHandler
2717	te.startServer(&testServer{security: e.security})
2718	defer te.tearDown()
2719	want := status.Error(codes.Unauthenticated, "user unauthenticated")
2720	if _, err := healthCheck(1*time.Second, te.clientConn(), ""); !reflect.DeepEqual(err, want) {
2721		t.Fatalf("Health/Check(_, _) = _, %v, want _, %v", err, want)
2722	}
2723}
2724
2725func (s) TestHealthCheckServingStatus(t *testing.T) {
2726	for _, e := range listTestEnv() {
2727		testHealthCheckServingStatus(t, e)
2728	}
2729}
2730
2731func testHealthCheckServingStatus(t *testing.T, e env) {
2732	te := newTest(t, e)
2733	hs := health.NewServer()
2734	te.healthServer = hs
2735	te.startServer(&testServer{security: e.security})
2736	defer te.tearDown()
2737
2738	cc := te.clientConn()
2739	out, err := healthCheck(1*time.Second, cc, "")
2740	if err != nil {
2741		t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2742	}
2743	if out.Status != healthpb.HealthCheckResponse_SERVING {
2744		t.Fatalf("Got the serving status %v, want SERVING", out.Status)
2745	}
2746	wantErr := status.Error(codes.NotFound, "unknown service")
2747	if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1.Health"); !reflect.DeepEqual(err, wantErr) {
2748		t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %s", err, codes.NotFound)
2749	}
2750	hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_SERVING)
2751	out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health")
2752	if err != nil {
2753		t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2754	}
2755	if out.Status != healthpb.HealthCheckResponse_SERVING {
2756		t.Fatalf("Got the serving status %v, want SERVING", out.Status)
2757	}
2758	hs.SetServingStatus("grpc.health.v1.Health", healthpb.HealthCheckResponse_NOT_SERVING)
2759	out, err = healthCheck(1*time.Second, cc, "grpc.health.v1.Health")
2760	if err != nil {
2761		t.Fatalf("Health/Check(_, _) = _, %v, want _, <nil>", err)
2762	}
2763	if out.Status != healthpb.HealthCheckResponse_NOT_SERVING {
2764		t.Fatalf("Got the serving status %v, want NOT_SERVING", out.Status)
2765	}
2766
2767}
2768
2769func (s) TestEmptyUnaryWithUserAgent(t *testing.T) {
2770	for _, e := range listTestEnv() {
2771		testEmptyUnaryWithUserAgent(t, e)
2772	}
2773}
2774
2775func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
2776	te := newTest(t, e)
2777	te.userAgent = testAppUA
2778	te.startServer(&testServer{security: e.security})
2779	defer te.tearDown()
2780
2781	cc := te.clientConn()
2782	tc := testpb.NewTestServiceClient(cc)
2783	var header metadata.MD
2784	reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header))
2785	if err != nil || !proto.Equal(&testpb.Empty{}, reply) {
2786		t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{})
2787	}
2788	if v, ok := header["ua"]; !ok || !strings.HasPrefix(v[0], testAppUA) {
2789		t.Fatalf("header[\"ua\"] = %q, %t, want string with prefix %q, true", v, ok, testAppUA)
2790	}
2791
2792	te.srv.Stop()
2793}
2794
2795func (s) TestFailedEmptyUnary(t *testing.T) {
2796	for _, e := range listTestEnv() {
2797		if e.name == "handler-tls" {
2798			// This test covers status details, but
2799			// Grpc-Status-Details-Bin is not support in handler_server.
2800			continue
2801		}
2802		testFailedEmptyUnary(t, e)
2803	}
2804}
2805
2806func testFailedEmptyUnary(t *testing.T, e env) {
2807	te := newTest(t, e)
2808	te.userAgent = failAppUA
2809	te.startServer(&testServer{security: e.security})
2810	defer te.tearDown()
2811	tc := testpb.NewTestServiceClient(te.clientConn())
2812
2813	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
2814	wantErr := detailedError
2815	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !reflect.DeepEqual(err, wantErr) {
2816		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr)
2817	}
2818}
2819
2820func (s) TestLargeUnary(t *testing.T) {
2821	for _, e := range listTestEnv() {
2822		testLargeUnary(t, e)
2823	}
2824}
2825
2826func testLargeUnary(t *testing.T, e env) {
2827	te := newTest(t, e)
2828	te.startServer(&testServer{security: e.security})
2829	defer te.tearDown()
2830	tc := testpb.NewTestServiceClient(te.clientConn())
2831
2832	const argSize = 271828
2833	const respSize = 314159
2834
2835	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2836	if err != nil {
2837		t.Fatal(err)
2838	}
2839
2840	req := &testpb.SimpleRequest{
2841		ResponseType: testpb.PayloadType_COMPRESSABLE,
2842		ResponseSize: respSize,
2843		Payload:      payload,
2844	}
2845	reply, err := tc.UnaryCall(context.Background(), req)
2846	if err != nil {
2847		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
2848	}
2849	pt := reply.GetPayload().GetType()
2850	ps := len(reply.GetPayload().GetBody())
2851	if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize {
2852		t.Fatalf("Got the reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize)
2853	}
2854}
2855
2856// Test backward-compatibility API for setting msg size limit.
2857func (s) TestExceedMsgLimit(t *testing.T) {
2858	for _, e := range listTestEnv() {
2859		testExceedMsgLimit(t, e)
2860	}
2861}
2862
2863func testExceedMsgLimit(t *testing.T, e env) {
2864	te := newTest(t, e)
2865	te.maxMsgSize = newInt(1024)
2866	te.startServer(&testServer{security: e.security})
2867	defer te.tearDown()
2868	tc := testpb.NewTestServiceClient(te.clientConn())
2869
2870	argSize := int32(*te.maxMsgSize + 1)
2871	const smallSize = 1
2872
2873	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2874	if err != nil {
2875		t.Fatal(err)
2876	}
2877	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2878	if err != nil {
2879		t.Fatal(err)
2880	}
2881
2882	// Test on server side for unary RPC.
2883	req := &testpb.SimpleRequest{
2884		ResponseType: testpb.PayloadType_COMPRESSABLE,
2885		ResponseSize: smallSize,
2886		Payload:      payload,
2887	}
2888	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2889		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2890	}
2891	// Test on client side for unary RPC.
2892	req.ResponseSize = int32(*te.maxMsgSize) + 1
2893	req.Payload = smallPayload
2894	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
2895		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2896	}
2897
2898	// Test on server side for streaming RPC.
2899	stream, err := tc.FullDuplexCall(te.ctx)
2900	if err != nil {
2901		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2902	}
2903	respParam := []*testpb.ResponseParameters{
2904		{
2905			Size: 1,
2906		},
2907	}
2908
2909	spayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(*te.maxMsgSize+1))
2910	if err != nil {
2911		t.Fatal(err)
2912	}
2913
2914	sreq := &testpb.StreamingOutputCallRequest{
2915		ResponseType:       testpb.PayloadType_COMPRESSABLE,
2916		ResponseParameters: respParam,
2917		Payload:            spayload,
2918	}
2919	if err := stream.Send(sreq); err != nil {
2920		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2921	}
2922	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2923		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2924	}
2925
2926	// Test on client side for streaming RPC.
2927	stream, err = tc.FullDuplexCall(te.ctx)
2928	if err != nil {
2929		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2930	}
2931	respParam[0].Size = int32(*te.maxMsgSize) + 1
2932	sreq.Payload = smallPayload
2933	if err := stream.Send(sreq); err != nil {
2934		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2935	}
2936	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2937		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2938	}
2939
2940}
2941
2942func (s) TestPeerClientSide(t *testing.T) {
2943	for _, e := range listTestEnv() {
2944		testPeerClientSide(t, e)
2945	}
2946}
2947
2948func testPeerClientSide(t *testing.T, e env) {
2949	te := newTest(t, e)
2950	te.userAgent = testAppUA
2951	te.startServer(&testServer{security: e.security})
2952	defer te.tearDown()
2953	tc := testpb.NewTestServiceClient(te.clientConn())
2954	peer := new(peer.Peer)
2955	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
2956		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2957	}
2958	pa := peer.Addr.String()
2959	if e.network == "unix" {
2960		if pa != te.srvAddr {
2961			t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
2962		}
2963		return
2964	}
2965	_, pp, err := net.SplitHostPort(pa)
2966	if err != nil {
2967		t.Fatalf("Failed to parse address from peer.")
2968	}
2969	_, sp, err := net.SplitHostPort(te.srvAddr)
2970	if err != nil {
2971		t.Fatalf("Failed to parse address of test server.")
2972	}
2973	if pp != sp {
2974		t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
2975	}
2976}
2977
2978// TestPeerNegative tests that if call fails setting peer
2979// doesn't cause a segmentation fault.
2980// issue#1141 https://github.com/grpc/grpc-go/issues/1141
2981func (s) TestPeerNegative(t *testing.T) {
2982	for _, e := range listTestEnv() {
2983		testPeerNegative(t, e)
2984	}
2985}
2986
2987func testPeerNegative(t *testing.T, e env) {
2988	te := newTest(t, e)
2989	te.startServer(&testServer{security: e.security})
2990	defer te.tearDown()
2991
2992	cc := te.clientConn()
2993	tc := testpb.NewTestServiceClient(cc)
2994	peer := new(peer.Peer)
2995	ctx, cancel := context.WithCancel(context.Background())
2996	cancel()
2997	tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer))
2998}
2999
3000func (s) TestPeerFailedRPC(t *testing.T) {
3001	for _, e := range listTestEnv() {
3002		testPeerFailedRPC(t, e)
3003	}
3004}
3005
3006func testPeerFailedRPC(t *testing.T, e env) {
3007	te := newTest(t, e)
3008	te.maxServerReceiveMsgSize = newInt(1 * 1024)
3009	te.startServer(&testServer{security: e.security})
3010
3011	defer te.tearDown()
3012	tc := testpb.NewTestServiceClient(te.clientConn())
3013
3014	// first make a successful request to the server
3015	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
3016		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
3017	}
3018
3019	// make a second request that will be rejected by the server
3020	const largeSize = 5 * 1024
3021	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
3022	if err != nil {
3023		t.Fatal(err)
3024	}
3025	req := &testpb.SimpleRequest{
3026		ResponseType: testpb.PayloadType_COMPRESSABLE,
3027		Payload:      largePayload,
3028	}
3029
3030	peer := new(peer.Peer)
3031	if _, err := tc.UnaryCall(context.Background(), req, grpc.Peer(peer)); err == nil || status.Code(err) != codes.ResourceExhausted {
3032		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
3033	} else {
3034		pa := peer.Addr.String()
3035		if e.network == "unix" {
3036			if pa != te.srvAddr {
3037				t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
3038			}
3039			return
3040		}
3041		_, pp, err := net.SplitHostPort(pa)
3042		if err != nil {
3043			t.Fatalf("Failed to parse address from peer.")
3044		}
3045		_, sp, err := net.SplitHostPort(te.srvAddr)
3046		if err != nil {
3047			t.Fatalf("Failed to parse address of test server.")
3048		}
3049		if pp != sp {
3050			t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
3051		}
3052	}
3053}
3054
3055func (s) TestMetadataUnaryRPC(t *testing.T) {
3056	for _, e := range listTestEnv() {
3057		testMetadataUnaryRPC(t, e)
3058	}
3059}
3060
3061func testMetadataUnaryRPC(t *testing.T, e env) {
3062	te := newTest(t, e)
3063	te.startServer(&testServer{security: e.security})
3064	defer te.tearDown()
3065	tc := testpb.NewTestServiceClient(te.clientConn())
3066
3067	const argSize = 2718
3068	const respSize = 314
3069
3070	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3071	if err != nil {
3072		t.Fatal(err)
3073	}
3074
3075	req := &testpb.SimpleRequest{
3076		ResponseType: testpb.PayloadType_COMPRESSABLE,
3077		ResponseSize: respSize,
3078		Payload:      payload,
3079	}
3080	var header, trailer metadata.MD
3081	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3082	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.Trailer(&trailer)); err != nil {
3083		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
3084	}
3085	// Ignore optional response headers that Servers may set:
3086	if header != nil {
3087		delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers
3088		delete(header, "date")    // the Date header is also optional
3089		delete(header, "user-agent")
3090		delete(header, "content-type")
3091	}
3092	if !reflect.DeepEqual(header, testMetadata) {
3093		t.Fatalf("Received header metadata %v, want %v", header, testMetadata)
3094	}
3095	if !reflect.DeepEqual(trailer, testTrailerMetadata) {
3096		t.Fatalf("Received trailer metadata %v, want %v", trailer, testTrailerMetadata)
3097	}
3098}
3099
3100func (s) TestMultipleSetTrailerUnaryRPC(t *testing.T) {
3101	for _, e := range listTestEnv() {
3102		testMultipleSetTrailerUnaryRPC(t, e)
3103	}
3104}
3105
3106func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) {
3107	te := newTest(t, e)
3108	te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
3109	defer te.tearDown()
3110	tc := testpb.NewTestServiceClient(te.clientConn())
3111
3112	const (
3113		argSize  = 1
3114		respSize = 1
3115	)
3116	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3117	if err != nil {
3118		t.Fatal(err)
3119	}
3120
3121	req := &testpb.SimpleRequest{
3122		ResponseType: testpb.PayloadType_COMPRESSABLE,
3123		ResponseSize: respSize,
3124		Payload:      payload,
3125	}
3126	var trailer metadata.MD
3127	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3128	if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.WaitForReady(true)); err != nil {
3129		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
3130	}
3131	expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
3132	if !reflect.DeepEqual(trailer, expectedTrailer) {
3133		t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
3134	}
3135}
3136
3137func (s) TestMultipleSetTrailerStreamingRPC(t *testing.T) {
3138	for _, e := range listTestEnv() {
3139		testMultipleSetTrailerStreamingRPC(t, e)
3140	}
3141}
3142
3143func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) {
3144	te := newTest(t, e)
3145	te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
3146	defer te.tearDown()
3147	tc := testpb.NewTestServiceClient(te.clientConn())
3148
3149	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3150	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
3151	if err != nil {
3152		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3153	}
3154	if err := stream.CloseSend(); err != nil {
3155		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3156	}
3157	if _, err := stream.Recv(); err != io.EOF {
3158		t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
3159	}
3160
3161	trailer := stream.Trailer()
3162	expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
3163	if !reflect.DeepEqual(trailer, expectedTrailer) {
3164		t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
3165	}
3166}
3167
3168func (s) TestSetAndSendHeaderUnaryRPC(t *testing.T) {
3169	for _, e := range listTestEnv() {
3170		if e.name == "handler-tls" {
3171			continue
3172		}
3173		testSetAndSendHeaderUnaryRPC(t, e)
3174	}
3175}
3176
3177// To test header metadata is sent on SendHeader().
3178func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) {
3179	te := newTest(t, e)
3180	te.startServer(&testServer{security: e.security, setAndSendHeader: true})
3181	defer te.tearDown()
3182	tc := testpb.NewTestServiceClient(te.clientConn())
3183
3184	const (
3185		argSize  = 1
3186		respSize = 1
3187	)
3188	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3189	if err != nil {
3190		t.Fatal(err)
3191	}
3192
3193	req := &testpb.SimpleRequest{
3194		ResponseType: testpb.PayloadType_COMPRESSABLE,
3195		ResponseSize: respSize,
3196		Payload:      payload,
3197	}
3198	var header metadata.MD
3199	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3200	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil {
3201		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
3202	}
3203	delete(header, "user-agent")
3204	delete(header, "content-type")
3205	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3206	if !reflect.DeepEqual(header, expectedHeader) {
3207		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3208	}
3209}
3210
3211func (s) TestMultipleSetHeaderUnaryRPC(t *testing.T) {
3212	for _, e := range listTestEnv() {
3213		if e.name == "handler-tls" {
3214			continue
3215		}
3216		testMultipleSetHeaderUnaryRPC(t, e)
3217	}
3218}
3219
3220// To test header metadata is sent when sending response.
3221func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) {
3222	te := newTest(t, e)
3223	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
3224	defer te.tearDown()
3225	tc := testpb.NewTestServiceClient(te.clientConn())
3226
3227	const (
3228		argSize  = 1
3229		respSize = 1
3230	)
3231	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3232	if err != nil {
3233		t.Fatal(err)
3234	}
3235
3236	req := &testpb.SimpleRequest{
3237		ResponseType: testpb.PayloadType_COMPRESSABLE,
3238		ResponseSize: respSize,
3239		Payload:      payload,
3240	}
3241
3242	var header metadata.MD
3243	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3244	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil {
3245		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
3246	}
3247	delete(header, "user-agent")
3248	delete(header, "content-type")
3249	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3250	if !reflect.DeepEqual(header, expectedHeader) {
3251		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3252	}
3253}
3254
3255func (s) TestMultipleSetHeaderUnaryRPCError(t *testing.T) {
3256	for _, e := range listTestEnv() {
3257		if e.name == "handler-tls" {
3258			continue
3259		}
3260		testMultipleSetHeaderUnaryRPCError(t, e)
3261	}
3262}
3263
3264// To test header metadata is sent when sending status.
3265func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) {
3266	te := newTest(t, e)
3267	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
3268	defer te.tearDown()
3269	tc := testpb.NewTestServiceClient(te.clientConn())
3270
3271	const (
3272		argSize  = 1
3273		respSize = -1 // Invalid respSize to make RPC fail.
3274	)
3275	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3276	if err != nil {
3277		t.Fatal(err)
3278	}
3279
3280	req := &testpb.SimpleRequest{
3281		ResponseType: testpb.PayloadType_COMPRESSABLE,
3282		ResponseSize: respSize,
3283		Payload:      payload,
3284	}
3285	var header metadata.MD
3286	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3287	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err == nil {
3288		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err)
3289	}
3290	delete(header, "user-agent")
3291	delete(header, "content-type")
3292	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3293	if !reflect.DeepEqual(header, expectedHeader) {
3294		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3295	}
3296}
3297
3298func (s) TestSetAndSendHeaderStreamingRPC(t *testing.T) {
3299	for _, e := range listTestEnv() {
3300		if e.name == "handler-tls" {
3301			continue
3302		}
3303		testSetAndSendHeaderStreamingRPC(t, e)
3304	}
3305}
3306
3307// To test header metadata is sent on SendHeader().
3308func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) {
3309	te := newTest(t, e)
3310	te.startServer(&testServer{security: e.security, setAndSendHeader: true})
3311	defer te.tearDown()
3312	tc := testpb.NewTestServiceClient(te.clientConn())
3313
3314	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3315	stream, err := tc.FullDuplexCall(ctx)
3316	if err != nil {
3317		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3318	}
3319	if err := stream.CloseSend(); err != nil {
3320		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3321	}
3322	if _, err := stream.Recv(); err != io.EOF {
3323		t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
3324	}
3325
3326	header, err := stream.Header()
3327	if err != nil {
3328		t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
3329	}
3330	delete(header, "user-agent")
3331	delete(header, "content-type")
3332	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3333	if !reflect.DeepEqual(header, expectedHeader) {
3334		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3335	}
3336}
3337
3338func (s) TestMultipleSetHeaderStreamingRPC(t *testing.T) {
3339	for _, e := range listTestEnv() {
3340		if e.name == "handler-tls" {
3341			continue
3342		}
3343		testMultipleSetHeaderStreamingRPC(t, e)
3344	}
3345}
3346
3347// To test header metadata is sent when sending response.
3348func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) {
3349	te := newTest(t, e)
3350	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
3351	defer te.tearDown()
3352	tc := testpb.NewTestServiceClient(te.clientConn())
3353
3354	const (
3355		argSize  = 1
3356		respSize = 1
3357	)
3358	ctx := metadata.NewOutgoingContext(context.Background(), testMetadata)
3359	stream, err := tc.FullDuplexCall(ctx)
3360	if err != nil {
3361		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3362	}
3363
3364	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3365	if err != nil {
3366		t.Fatal(err)
3367	}
3368
3369	req := &testpb.StreamingOutputCallRequest{
3370		ResponseType: testpb.PayloadType_COMPRESSABLE,
3371		ResponseParameters: []*testpb.ResponseParameters{
3372			{Size: respSize},
3373		},
3374		Payload: payload,
3375	}
3376	if err := stream.Send(req); err != nil {
3377		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3378	}
3379	if _, err := stream.Recv(); err != nil {
3380		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
3381	}
3382	if err := stream.CloseSend(); err != nil {
3383		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3384	}
3385	if _, err := stream.Recv(); err != io.EOF {
3386		t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
3387	}
3388
3389	header, err := stream.Header()
3390	if err != nil {
3391		t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
3392	}
3393	delete(header, "user-agent")
3394	delete(header, "content-type")
3395	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3396	if !reflect.DeepEqual(header, expectedHeader) {
3397		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3398	}
3399
3400}
3401
3402func (s) TestMultipleSetHeaderStreamingRPCError(t *testing.T) {
3403	for _, e := range listTestEnv() {
3404		if e.name == "handler-tls" {
3405			continue
3406		}
3407		testMultipleSetHeaderStreamingRPCError(t, e)
3408	}
3409}
3410
3411// To test header metadata is sent when sending status.
3412func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
3413	te := newTest(t, e)
3414	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
3415	defer te.tearDown()
3416	tc := testpb.NewTestServiceClient(te.clientConn())
3417
3418	const (
3419		argSize  = 1
3420		respSize = -1
3421	)
3422	ctx, cancel := context.WithCancel(context.Background())
3423	defer cancel()
3424	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
3425	stream, err := tc.FullDuplexCall(ctx)
3426	if err != nil {
3427		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3428	}
3429
3430	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3431	if err != nil {
3432		t.Fatal(err)
3433	}
3434
3435	req := &testpb.StreamingOutputCallRequest{
3436		ResponseType: testpb.PayloadType_COMPRESSABLE,
3437		ResponseParameters: []*testpb.ResponseParameters{
3438			{Size: respSize},
3439		},
3440		Payload: payload,
3441	}
3442	if err := stream.Send(req); err != nil {
3443		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3444	}
3445	if _, err := stream.Recv(); err == nil {
3446		t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
3447	}
3448
3449	header, err := stream.Header()
3450	if err != nil {
3451		t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
3452	}
3453	delete(header, "user-agent")
3454	delete(header, "content-type")
3455	expectedHeader := metadata.Join(testMetadata, testMetadata2)
3456	if !reflect.DeepEqual(header, expectedHeader) {
3457		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
3458	}
3459	if err := stream.CloseSend(); err != nil {
3460		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3461	}
3462}
3463
3464// TestMalformedHTTP2Metedata verfies the returned error when the client
3465// sends an illegal metadata.
3466func (s) TestMalformedHTTP2Metadata(t *testing.T) {
3467	for _, e := range listTestEnv() {
3468		if e.name == "handler-tls" {
3469			// Failed with "server stops accepting new RPCs".
3470			// Server stops accepting new RPCs when the client sends an illegal http2 header.
3471			continue
3472		}
3473		testMalformedHTTP2Metadata(t, e)
3474	}
3475}
3476
3477func testMalformedHTTP2Metadata(t *testing.T, e env) {
3478	te := newTest(t, e)
3479	te.startServer(&testServer{security: e.security})
3480	defer te.tearDown()
3481	tc := testpb.NewTestServiceClient(te.clientConn())
3482
3483	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 2718)
3484	if err != nil {
3485		t.Fatal(err)
3486	}
3487
3488	req := &testpb.SimpleRequest{
3489		ResponseType: testpb.PayloadType_COMPRESSABLE,
3490		ResponseSize: 314,
3491		Payload:      payload,
3492	}
3493	ctx := metadata.NewOutgoingContext(context.Background(), malformedHTTP2Metadata)
3494	if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Internal {
3495		t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal)
3496	}
3497}
3498
3499func (s) TestTransparentRetry(t *testing.T) {
3500	for _, e := range listTestEnv() {
3501		if e.name == "handler-tls" {
3502			// Fails with RST_STREAM / FLOW_CONTROL_ERROR
3503			continue
3504		}
3505		testTransparentRetry(t, e)
3506	}
3507}
3508
3509// This test makes sure RPCs are retried times when they receive a RST_STREAM
3510// with the REFUSED_STREAM error code, which the InTapHandle provokes.
3511func testTransparentRetry(t *testing.T, e env) {
3512	te := newTest(t, e)
3513	attempts := 0
3514	successAttempt := 2
3515	te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) {
3516		attempts++
3517		if attempts < successAttempt {
3518			return nil, errors.New("not now")
3519		}
3520		return ctx, nil
3521	}
3522	te.startServer(&testServer{security: e.security})
3523	defer te.tearDown()
3524
3525	cc := te.clientConn()
3526	tsc := testpb.NewTestServiceClient(cc)
3527	testCases := []struct {
3528		successAttempt int
3529		failFast       bool
3530		errCode        codes.Code
3531	}{{
3532		successAttempt: 1,
3533	}, {
3534		successAttempt: 2,
3535	}, {
3536		successAttempt: 3,
3537		errCode:        codes.Unavailable,
3538	}, {
3539		successAttempt: 1,
3540		failFast:       true,
3541	}, {
3542		successAttempt: 2,
3543		failFast:       true,
3544		errCode:        codes.Unavailable, // We won't retry on fail fast.
3545	}}
3546	for _, tc := range testCases {
3547		attempts = 0
3548		successAttempt = tc.successAttempt
3549
3550		ctx, cancel := context.WithTimeout(context.Background(), time.Second)
3551		_, err := tsc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(!tc.failFast))
3552		cancel()
3553		if status.Code(err) != tc.errCode {
3554			t.Errorf("%+v: tsc.EmptyCall(_, _) = _, %v, want _, Code=%v", tc, err, tc.errCode)
3555		}
3556	}
3557}
3558
3559func (s) TestCancel(t *testing.T) {
3560	for _, e := range listTestEnv() {
3561		testCancel(t, e)
3562	}
3563}
3564
3565func testCancel(t *testing.T, e env) {
3566	te := newTest(t, e)
3567	te.declareLogNoise("grpc: the client connection is closing; please retry")
3568	te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second})
3569	defer te.tearDown()
3570
3571	cc := te.clientConn()
3572	tc := testpb.NewTestServiceClient(cc)
3573
3574	const argSize = 2718
3575	const respSize = 314
3576
3577	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3578	if err != nil {
3579		t.Fatal(err)
3580	}
3581
3582	req := &testpb.SimpleRequest{
3583		ResponseType: testpb.PayloadType_COMPRESSABLE,
3584		ResponseSize: respSize,
3585		Payload:      payload,
3586	}
3587	ctx, cancel := context.WithCancel(context.Background())
3588	time.AfterFunc(1*time.Millisecond, cancel)
3589	if r, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Canceled {
3590		t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled)
3591	}
3592	awaitNewConnLogOutput()
3593}
3594
3595func (s) TestCancelNoIO(t *testing.T) {
3596	for _, e := range listTestEnv() {
3597		testCancelNoIO(t, e)
3598	}
3599}
3600
3601func testCancelNoIO(t *testing.T, e env) {
3602	te := newTest(t, e)
3603	te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken")
3604	te.maxStream = 1 // Only allows 1 live stream per server transport.
3605	te.startServer(&testServer{security: e.security})
3606	defer te.tearDown()
3607
3608	cc := te.clientConn()
3609	tc := testpb.NewTestServiceClient(cc)
3610
3611	// Start one blocked RPC for which we'll never send streaming
3612	// input. This will consume the 1 maximum concurrent streams,
3613	// causing future RPCs to hang.
3614	ctx, cancelFirst := context.WithCancel(context.Background())
3615	_, err := tc.StreamingInputCall(ctx)
3616	if err != nil {
3617		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3618	}
3619
3620	// Loop until the ClientConn receives the initial settings
3621	// frame from the server, notifying it about the maximum
3622	// concurrent streams. We know when it's received it because
3623	// an RPC will fail with codes.DeadlineExceeded instead of
3624	// succeeding.
3625	// TODO(bradfitz): add internal test hook for this (Issue 534)
3626	for {
3627		ctx, cancelSecond := context.WithTimeout(context.Background(), 50*time.Millisecond)
3628		_, err := tc.StreamingInputCall(ctx)
3629		cancelSecond()
3630		if err == nil {
3631			continue
3632		}
3633		if status.Code(err) == codes.DeadlineExceeded {
3634			break
3635		}
3636		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
3637	}
3638	// If there are any RPCs in flight before the client receives
3639	// the max streams setting, let them be expired.
3640	// TODO(bradfitz): add internal test hook for this (Issue 534)
3641	time.Sleep(50 * time.Millisecond)
3642
3643	go func() {
3644		time.Sleep(50 * time.Millisecond)
3645		cancelFirst()
3646	}()
3647
3648	// This should be blocked until the 1st is canceled, then succeed.
3649	ctx, cancelThird := context.WithTimeout(context.Background(), 500*time.Millisecond)
3650	if _, err := tc.StreamingInputCall(ctx); err != nil {
3651		t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3652	}
3653	cancelThird()
3654}
3655
3656// The following tests the gRPC streaming RPC implementations.
3657// TODO(zhaoq): Have better coverage on error cases.
3658var (
3659	reqSizes  = []int{27182, 8, 1828, 45904}
3660	respSizes = []int{31415, 9, 2653, 58979}
3661)
3662
3663func (s) TestNoService(t *testing.T) {
3664	for _, e := range listTestEnv() {
3665		testNoService(t, e)
3666	}
3667}
3668
3669func testNoService(t *testing.T, e env) {
3670	te := newTest(t, e)
3671	te.startServer(nil)
3672	defer te.tearDown()
3673
3674	cc := te.clientConn()
3675	tc := testpb.NewTestServiceClient(cc)
3676
3677	stream, err := tc.FullDuplexCall(te.ctx, grpc.WaitForReady(true))
3678	if err != nil {
3679		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3680	}
3681	if _, err := stream.Recv(); status.Code(err) != codes.Unimplemented {
3682		t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented)
3683	}
3684}
3685
3686func (s) TestPingPong(t *testing.T) {
3687	for _, e := range listTestEnv() {
3688		testPingPong(t, e)
3689	}
3690}
3691
3692func testPingPong(t *testing.T, e env) {
3693	te := newTest(t, e)
3694	te.startServer(&testServer{security: e.security})
3695	defer te.tearDown()
3696	tc := testpb.NewTestServiceClient(te.clientConn())
3697
3698	stream, err := tc.FullDuplexCall(te.ctx)
3699	if err != nil {
3700		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3701	}
3702	var index int
3703	for index < len(reqSizes) {
3704		respParam := []*testpb.ResponseParameters{
3705			{
3706				Size: int32(respSizes[index]),
3707			},
3708		}
3709
3710		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
3711		if err != nil {
3712			t.Fatal(err)
3713		}
3714
3715		req := &testpb.StreamingOutputCallRequest{
3716			ResponseType:       testpb.PayloadType_COMPRESSABLE,
3717			ResponseParameters: respParam,
3718			Payload:            payload,
3719		}
3720		if err := stream.Send(req); err != nil {
3721			t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3722		}
3723		reply, err := stream.Recv()
3724		if err != nil {
3725			t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
3726		}
3727		pt := reply.GetPayload().GetType()
3728		if pt != testpb.PayloadType_COMPRESSABLE {
3729			t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
3730		}
3731		size := len(reply.GetPayload().GetBody())
3732		if size != int(respSizes[index]) {
3733			t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
3734		}
3735		index++
3736	}
3737	if err := stream.CloseSend(); err != nil {
3738		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3739	}
3740	if _, err := stream.Recv(); err != io.EOF {
3741		t.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
3742	}
3743}
3744
3745func (s) TestMetadataStreamingRPC(t *testing.T) {
3746	for _, e := range listTestEnv() {
3747		testMetadataStreamingRPC(t, e)
3748	}
3749}
3750
3751func testMetadataStreamingRPC(t *testing.T, e env) {
3752	te := newTest(t, e)
3753	te.startServer(&testServer{security: e.security})
3754	defer te.tearDown()
3755	tc := testpb.NewTestServiceClient(te.clientConn())
3756
3757	ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
3758	stream, err := tc.FullDuplexCall(ctx)
3759	if err != nil {
3760		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3761	}
3762	go func() {
3763		headerMD, err := stream.Header()
3764		if e.security == "tls" {
3765			delete(headerMD, "transport_security_type")
3766		}
3767		delete(headerMD, "trailer") // ignore if present
3768		delete(headerMD, "user-agent")
3769		delete(headerMD, "content-type")
3770		if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
3771			t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
3772		}
3773		// test the cached value.
3774		headerMD, err = stream.Header()
3775		delete(headerMD, "trailer") // ignore if present
3776		delete(headerMD, "user-agent")
3777		delete(headerMD, "content-type")
3778		if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
3779			t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
3780		}
3781		err = func() error {
3782			for index := 0; index < len(reqSizes); index++ {
3783				respParam := []*testpb.ResponseParameters{
3784					{
3785						Size: int32(respSizes[index]),
3786					},
3787				}
3788
3789				payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
3790				if err != nil {
3791					return err
3792				}
3793
3794				req := &testpb.StreamingOutputCallRequest{
3795					ResponseType:       testpb.PayloadType_COMPRESSABLE,
3796					ResponseParameters: respParam,
3797					Payload:            payload,
3798				}
3799				if err := stream.Send(req); err != nil {
3800					return fmt.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3801				}
3802			}
3803			return nil
3804		}()
3805		// Tell the server we're done sending args.
3806		stream.CloseSend()
3807		if err != nil {
3808			t.Error(err)
3809		}
3810	}()
3811	for {
3812		if _, err := stream.Recv(); err != nil {
3813			break
3814		}
3815	}
3816	trailerMD := stream.Trailer()
3817	if !reflect.DeepEqual(testTrailerMetadata, trailerMD) {
3818		t.Fatalf("%v.Trailer() = %v, want %v", stream, trailerMD, testTrailerMetadata)
3819	}
3820}
3821
3822func (s) TestServerStreaming(t *testing.T) {
3823	for _, e := range listTestEnv() {
3824		testServerStreaming(t, e)
3825	}
3826}
3827
3828func testServerStreaming(t *testing.T, e env) {
3829	te := newTest(t, e)
3830	te.startServer(&testServer{security: e.security})
3831	defer te.tearDown()
3832	tc := testpb.NewTestServiceClient(te.clientConn())
3833
3834	respParam := make([]*testpb.ResponseParameters, len(respSizes))
3835	for i, s := range respSizes {
3836		respParam[i] = &testpb.ResponseParameters{
3837			Size: int32(s),
3838		}
3839	}
3840	req := &testpb.StreamingOutputCallRequest{
3841		ResponseType:       testpb.PayloadType_COMPRESSABLE,
3842		ResponseParameters: respParam,
3843	}
3844	stream, err := tc.StreamingOutputCall(context.Background(), req)
3845	if err != nil {
3846		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3847	}
3848	var rpcStatus error
3849	var respCnt int
3850	var index int
3851	for {
3852		reply, err := stream.Recv()
3853		if err != nil {
3854			rpcStatus = err
3855			break
3856		}
3857		pt := reply.GetPayload().GetType()
3858		if pt != testpb.PayloadType_COMPRESSABLE {
3859			t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
3860		}
3861		size := len(reply.GetPayload().GetBody())
3862		if size != int(respSizes[index]) {
3863			t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
3864		}
3865		index++
3866		respCnt++
3867	}
3868	if rpcStatus != io.EOF {
3869		t.Fatalf("Failed to finish the server streaming rpc: %v, want <EOF>", rpcStatus)
3870	}
3871	if respCnt != len(respSizes) {
3872		t.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
3873	}
3874}
3875
3876func (s) TestFailedServerStreaming(t *testing.T) {
3877	for _, e := range listTestEnv() {
3878		testFailedServerStreaming(t, e)
3879	}
3880}
3881
3882func testFailedServerStreaming(t *testing.T, e env) {
3883	te := newTest(t, e)
3884	te.userAgent = failAppUA
3885	te.startServer(&testServer{security: e.security})
3886	defer te.tearDown()
3887	tc := testpb.NewTestServiceClient(te.clientConn())
3888
3889	respParam := make([]*testpb.ResponseParameters, len(respSizes))
3890	for i, s := range respSizes {
3891		respParam[i] = &testpb.ResponseParameters{
3892			Size: int32(s),
3893		}
3894	}
3895	req := &testpb.StreamingOutputCallRequest{
3896		ResponseType:       testpb.PayloadType_COMPRESSABLE,
3897		ResponseParameters: respParam,
3898	}
3899	ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
3900	stream, err := tc.StreamingOutputCall(ctx, req)
3901	if err != nil {
3902		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3903	}
3904	wantErr := status.Error(codes.DataLoss, "error for testing: "+failAppUA)
3905	if _, err := stream.Recv(); !reflect.DeepEqual(err, wantErr) {
3906		t.Fatalf("%v.Recv() = _, %v, want _, %v", stream, err, wantErr)
3907	}
3908}
3909
3910// concurrentSendServer is a TestServiceServer whose
3911// StreamingOutputCall makes ten serial Send calls, sending payloads
3912// "0".."9", inclusive.  TestServerStreamingConcurrent verifies they
3913// were received in the correct order, and that there were no races.
3914//
3915// All other TestServiceServer methods crash if called.
3916type concurrentSendServer struct {
3917	testpb.TestServiceServer
3918}
3919
3920func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
3921	for i := 0; i < 10; i++ {
3922		stream.Send(&testpb.StreamingOutputCallResponse{
3923			Payload: &testpb.Payload{
3924				Body: []byte{'0' + uint8(i)},
3925			},
3926		})
3927	}
3928	return nil
3929}
3930
3931// Tests doing a bunch of concurrent streaming output calls.
3932func (s) TestServerStreamingConcurrent(t *testing.T) {
3933	for _, e := range listTestEnv() {
3934		testServerStreamingConcurrent(t, e)
3935	}
3936}
3937
3938func testServerStreamingConcurrent(t *testing.T, e env) {
3939	te := newTest(t, e)
3940	te.startServer(concurrentSendServer{})
3941	defer te.tearDown()
3942
3943	cc := te.clientConn()
3944	tc := testpb.NewTestServiceClient(cc)
3945
3946	doStreamingCall := func() {
3947		req := &testpb.StreamingOutputCallRequest{}
3948		stream, err := tc.StreamingOutputCall(context.Background(), req)
3949		if err != nil {
3950			t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3951			return
3952		}
3953		var ngot int
3954		var buf bytes.Buffer
3955		for {
3956			reply, err := stream.Recv()
3957			if err == io.EOF {
3958				break
3959			}
3960			if err != nil {
3961				t.Fatal(err)
3962			}
3963			ngot++
3964			if buf.Len() > 0 {
3965				buf.WriteByte(',')
3966			}
3967			buf.Write(reply.GetPayload().GetBody())
3968		}
3969		if want := 10; ngot != want {
3970			t.Errorf("Got %d replies, want %d", ngot, want)
3971		}
3972		if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want {
3973			t.Errorf("Got replies %q; want %q", got, want)
3974		}
3975	}
3976
3977	var wg sync.WaitGroup
3978	for i := 0; i < 20; i++ {
3979		wg.Add(1)
3980		go func() {
3981			defer wg.Done()
3982			doStreamingCall()
3983		}()
3984	}
3985	wg.Wait()
3986
3987}
3988
3989func generatePayloadSizes() [][]int {
3990	reqSizes := [][]int{
3991		{27182, 8, 1828, 45904},
3992	}
3993
3994	num8KPayloads := 1024
3995	eightKPayloads := []int{}
3996	for i := 0; i < num8KPayloads; i++ {
3997		eightKPayloads = append(eightKPayloads, (1 << 13))
3998	}
3999	reqSizes = append(reqSizes, eightKPayloads)
4000
4001	num2MPayloads := 8
4002	twoMPayloads := []int{}
4003	for i := 0; i < num2MPayloads; i++ {
4004		twoMPayloads = append(twoMPayloads, (1 << 21))
4005	}
4006	reqSizes = append(reqSizes, twoMPayloads)
4007
4008	return reqSizes
4009}
4010
4011func (s) TestClientStreaming(t *testing.T) {
4012	for _, s := range generatePayloadSizes() {
4013		for _, e := range listTestEnv() {
4014			testClientStreaming(t, e, s)
4015		}
4016	}
4017}
4018
4019func testClientStreaming(t *testing.T, e env, sizes []int) {
4020	te := newTest(t, e)
4021	te.startServer(&testServer{security: e.security})
4022	defer te.tearDown()
4023	tc := testpb.NewTestServiceClient(te.clientConn())
4024
4025	ctx, cancel := context.WithTimeout(te.ctx, time.Second*30)
4026	defer cancel()
4027	stream, err := tc.StreamingInputCall(ctx)
4028	if err != nil {
4029		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
4030	}
4031
4032	var sum int
4033	for _, s := range sizes {
4034		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s))
4035		if err != nil {
4036			t.Fatal(err)
4037		}
4038
4039		req := &testpb.StreamingInputCallRequest{
4040			Payload: payload,
4041		}
4042		if err := stream.Send(req); err != nil {
4043			t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
4044		}
4045		sum += s
4046	}
4047	reply, err := stream.CloseAndRecv()
4048	if err != nil {
4049		t.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
4050	}
4051	if reply.GetAggregatedPayloadSize() != int32(sum) {
4052		t.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
4053	}
4054}
4055
4056func (s) TestClientStreamingError(t *testing.T) {
4057	for _, e := range listTestEnv() {
4058		if e.name == "handler-tls" {
4059			continue
4060		}
4061		testClientStreamingError(t, e)
4062	}
4063}
4064
4065func testClientStreamingError(t *testing.T, e env) {
4066	te := newTest(t, e)
4067	te.startServer(&testServer{security: e.security, earlyFail: true})
4068	defer te.tearDown()
4069	tc := testpb.NewTestServiceClient(te.clientConn())
4070
4071	stream, err := tc.StreamingInputCall(te.ctx)
4072	if err != nil {
4073		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
4074	}
4075	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1)
4076	if err != nil {
4077		t.Fatal(err)
4078	}
4079
4080	req := &testpb.StreamingInputCallRequest{
4081		Payload: payload,
4082	}
4083	// The 1st request should go through.
4084	if err := stream.Send(req); err != nil {
4085		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
4086	}
4087	for {
4088		if err := stream.Send(req); err != io.EOF {
4089			continue
4090		}
4091		if _, err := stream.CloseAndRecv(); status.Code(err) != codes.NotFound {
4092			t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound)
4093		}
4094		break
4095	}
4096}
4097
4098func (s) TestExceedMaxStreamsLimit(t *testing.T) {
4099	for _, e := range listTestEnv() {
4100		testExceedMaxStreamsLimit(t, e)
4101	}
4102}
4103
4104func testExceedMaxStreamsLimit(t *testing.T, e env) {
4105	te := newTest(t, e)
4106	te.declareLogNoise(
4107		"http2Client.notifyError got notified that the client transport was broken",
4108		"Conn.resetTransport failed to create client transport",
4109		"grpc: the connection is closing",
4110	)
4111	te.maxStream = 1 // Only allows 1 live stream per server transport.
4112	te.startServer(&testServer{security: e.security})
4113	defer te.tearDown()
4114
4115	cc := te.clientConn()
4116	tc := testpb.NewTestServiceClient(cc)
4117
4118	_, err := tc.StreamingInputCall(te.ctx)
4119	if err != nil {
4120		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
4121	}
4122	// Loop until receiving the new max stream setting from the server.
4123	for {
4124		ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
4125		defer cancel()
4126		_, err := tc.StreamingInputCall(ctx)
4127		if err == nil {
4128			time.Sleep(50 * time.Millisecond)
4129			continue
4130		}
4131		if status.Code(err) == codes.DeadlineExceeded {
4132			break
4133		}
4134		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
4135	}
4136}
4137
4138func (s) TestStreamsQuotaRecovery(t *testing.T) {
4139	for _, e := range listTestEnv() {
4140		testStreamsQuotaRecovery(t, e)
4141	}
4142}
4143
4144func testStreamsQuotaRecovery(t *testing.T, e env) {
4145	te := newTest(t, e)
4146	te.declareLogNoise(
4147		"http2Client.notifyError got notified that the client transport was broken",
4148		"Conn.resetTransport failed to create client transport",
4149		"grpc: the connection is closing",
4150	)
4151	te.maxStream = 1 // Allows 1 live stream.
4152	te.startServer(&testServer{security: e.security})
4153	defer te.tearDown()
4154
4155	cc := te.clientConn()
4156	tc := testpb.NewTestServiceClient(cc)
4157	ctx, cancel := context.WithCancel(context.Background())
4158	defer cancel()
4159	if _, err := tc.StreamingInputCall(ctx); err != nil {
4160		t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, <nil>", err)
4161	}
4162	// Loop until the new max stream setting is effective.
4163	for {
4164		ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
4165		_, err := tc.StreamingInputCall(ctx)
4166		cancel()
4167		if err == nil {
4168			time.Sleep(5 * time.Millisecond)
4169			continue
4170		}
4171		if status.Code(err) == codes.DeadlineExceeded {
4172			break
4173		}
4174		t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %s", err, codes.DeadlineExceeded)
4175	}
4176
4177	var wg sync.WaitGroup
4178	for i := 0; i < 10; i++ {
4179		wg.Add(1)
4180		go func() {
4181			defer wg.Done()
4182			payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314)
4183			if err != nil {
4184				t.Error(err)
4185				return
4186			}
4187			req := &testpb.SimpleRequest{
4188				ResponseType: testpb.PayloadType_COMPRESSABLE,
4189				ResponseSize: 1592,
4190				Payload:      payload,
4191			}
4192			// No rpc should go through due to the max streams limit.
4193			ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
4194			defer cancel()
4195			if _, err := tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
4196				t.Errorf("tc.UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
4197			}
4198		}()
4199	}
4200	wg.Wait()
4201
4202	cancel()
4203	// A new stream should be allowed after canceling the first one.
4204	ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
4205	defer cancel()
4206	if _, err := tc.StreamingInputCall(ctx); err != nil {
4207		t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %v", err, nil)
4208	}
4209}
4210
4211func (s) TestCompressServerHasNoSupport(t *testing.T) {
4212	for _, e := range listTestEnv() {
4213		testCompressServerHasNoSupport(t, e)
4214	}
4215}
4216
4217func testCompressServerHasNoSupport(t *testing.T, e env) {
4218	te := newTest(t, e)
4219	te.serverCompression = false
4220	te.clientCompression = false
4221	te.clientNopCompression = true
4222	te.startServer(&testServer{security: e.security})
4223	defer te.tearDown()
4224	tc := testpb.NewTestServiceClient(te.clientConn())
4225
4226	const argSize = 271828
4227	const respSize = 314159
4228	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
4229	if err != nil {
4230		t.Fatal(err)
4231	}
4232	req := &testpb.SimpleRequest{
4233		ResponseType: testpb.PayloadType_COMPRESSABLE,
4234		ResponseSize: respSize,
4235		Payload:      payload,
4236	}
4237	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.Unimplemented {
4238		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code %s", err, codes.Unimplemented)
4239	}
4240	// Streaming RPC
4241	stream, err := tc.FullDuplexCall(context.Background())
4242	if err != nil {
4243		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4244	}
4245	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Unimplemented {
4246		t.Fatalf("%v.Recv() = %v, want error code %s", stream, err, codes.Unimplemented)
4247	}
4248}
4249
4250func (s) TestCompressOK(t *testing.T) {
4251	for _, e := range listTestEnv() {
4252		testCompressOK(t, e)
4253	}
4254}
4255
4256func testCompressOK(t *testing.T, e env) {
4257	te := newTest(t, e)
4258	te.serverCompression = true
4259	te.clientCompression = true
4260	te.startServer(&testServer{security: e.security})
4261	defer te.tearDown()
4262	tc := testpb.NewTestServiceClient(te.clientConn())
4263
4264	// Unary call
4265	const argSize = 271828
4266	const respSize = 314159
4267	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
4268	if err != nil {
4269		t.Fatal(err)
4270	}
4271	req := &testpb.SimpleRequest{
4272		ResponseType: testpb.PayloadType_COMPRESSABLE,
4273		ResponseSize: respSize,
4274		Payload:      payload,
4275	}
4276	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something"))
4277	if _, err := tc.UnaryCall(ctx, req); err != nil {
4278		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
4279	}
4280	// Streaming RPC
4281	ctx, cancel := context.WithCancel(context.Background())
4282	defer cancel()
4283	stream, err := tc.FullDuplexCall(ctx)
4284	if err != nil {
4285		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4286	}
4287	respParam := []*testpb.ResponseParameters{
4288		{
4289			Size: 31415,
4290		},
4291	}
4292	payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
4293	if err != nil {
4294		t.Fatal(err)
4295	}
4296	sreq := &testpb.StreamingOutputCallRequest{
4297		ResponseType:       testpb.PayloadType_COMPRESSABLE,
4298		ResponseParameters: respParam,
4299		Payload:            payload,
4300	}
4301	if err := stream.Send(sreq); err != nil {
4302		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
4303	}
4304	stream.CloseSend()
4305	if _, err := stream.Recv(); err != nil {
4306		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
4307	}
4308	if _, err := stream.Recv(); err != io.EOF {
4309		t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err)
4310	}
4311}
4312
4313func (s) TestIdentityEncoding(t *testing.T) {
4314	for _, e := range listTestEnv() {
4315		testIdentityEncoding(t, e)
4316	}
4317}
4318
4319func testIdentityEncoding(t *testing.T, e env) {
4320	te := newTest(t, e)
4321	te.startServer(&testServer{security: e.security})
4322	defer te.tearDown()
4323	tc := testpb.NewTestServiceClient(te.clientConn())
4324
4325	// Unary call
4326	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 5)
4327	if err != nil {
4328		t.Fatal(err)
4329	}
4330	req := &testpb.SimpleRequest{
4331		ResponseType: testpb.PayloadType_COMPRESSABLE,
4332		ResponseSize: 10,
4333		Payload:      payload,
4334	}
4335	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something"))
4336	if _, err := tc.UnaryCall(ctx, req); err != nil {
4337		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
4338	}
4339	// Streaming RPC
4340	ctx, cancel := context.WithCancel(context.Background())
4341	defer cancel()
4342	stream, err := tc.FullDuplexCall(ctx, grpc.UseCompressor("identity"))
4343	if err != nil {
4344		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4345	}
4346	payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
4347	if err != nil {
4348		t.Fatal(err)
4349	}
4350	sreq := &testpb.StreamingOutputCallRequest{
4351		ResponseType:       testpb.PayloadType_COMPRESSABLE,
4352		ResponseParameters: []*testpb.ResponseParameters{{Size: 10}},
4353		Payload:            payload,
4354	}
4355	if err := stream.Send(sreq); err != nil {
4356		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
4357	}
4358	stream.CloseSend()
4359	if _, err := stream.Recv(); err != nil {
4360		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
4361	}
4362	if _, err := stream.Recv(); err != io.EOF {
4363		t.Fatalf("%v.Recv() = %v, want io.EOF", stream, err)
4364	}
4365}
4366
4367func (s) TestUnaryClientInterceptor(t *testing.T) {
4368	for _, e := range listTestEnv() {
4369		testUnaryClientInterceptor(t, e)
4370	}
4371}
4372
4373func failOkayRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
4374	err := invoker(ctx, method, req, reply, cc, opts...)
4375	if err == nil {
4376		return status.Error(codes.NotFound, "")
4377	}
4378	return err
4379}
4380
4381func testUnaryClientInterceptor(t *testing.T, e env) {
4382	te := newTest(t, e)
4383	te.userAgent = testAppUA
4384	te.unaryClientInt = failOkayRPC
4385	te.startServer(&testServer{security: e.security})
4386	defer te.tearDown()
4387
4388	tc := testpb.NewTestServiceClient(te.clientConn())
4389	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.NotFound {
4390		t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.NotFound)
4391	}
4392}
4393
4394func (s) TestStreamClientInterceptor(t *testing.T) {
4395	for _, e := range listTestEnv() {
4396		testStreamClientInterceptor(t, e)
4397	}
4398}
4399
4400func failOkayStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
4401	s, err := streamer(ctx, desc, cc, method, opts...)
4402	if err == nil {
4403		return nil, status.Error(codes.NotFound, "")
4404	}
4405	return s, nil
4406}
4407
4408func testStreamClientInterceptor(t *testing.T, e env) {
4409	te := newTest(t, e)
4410	te.streamClientInt = failOkayStream
4411	te.startServer(&testServer{security: e.security})
4412	defer te.tearDown()
4413
4414	tc := testpb.NewTestServiceClient(te.clientConn())
4415	respParam := []*testpb.ResponseParameters{
4416		{
4417			Size: int32(1),
4418		},
4419	}
4420	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
4421	if err != nil {
4422		t.Fatal(err)
4423	}
4424	req := &testpb.StreamingOutputCallRequest{
4425		ResponseType:       testpb.PayloadType_COMPRESSABLE,
4426		ResponseParameters: respParam,
4427		Payload:            payload,
4428	}
4429	if _, err := tc.StreamingOutputCall(context.Background(), req); status.Code(err) != codes.NotFound {
4430		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, error code %s", tc, err, codes.NotFound)
4431	}
4432}
4433
4434func (s) TestUnaryServerInterceptor(t *testing.T) {
4435	for _, e := range listTestEnv() {
4436		testUnaryServerInterceptor(t, e)
4437	}
4438}
4439
4440func errInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
4441	return nil, status.Error(codes.PermissionDenied, "")
4442}
4443
4444func testUnaryServerInterceptor(t *testing.T, e env) {
4445	te := newTest(t, e)
4446	te.unaryServerInt = errInjector
4447	te.startServer(&testServer{security: e.security})
4448	defer te.tearDown()
4449
4450	tc := testpb.NewTestServiceClient(te.clientConn())
4451	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.PermissionDenied {
4452		t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
4453	}
4454}
4455
4456func (s) TestStreamServerInterceptor(t *testing.T) {
4457	for _, e := range listTestEnv() {
4458		// TODO(bradfitz): Temporarily skip this env due to #619.
4459		if e.name == "handler-tls" {
4460			continue
4461		}
4462		testStreamServerInterceptor(t, e)
4463	}
4464}
4465
4466func fullDuplexOnly(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
4467	if info.FullMethod == "/grpc.testing.TestService/FullDuplexCall" {
4468		return handler(srv, ss)
4469	}
4470	// Reject the other methods.
4471	return status.Error(codes.PermissionDenied, "")
4472}
4473
4474func testStreamServerInterceptor(t *testing.T, e env) {
4475	te := newTest(t, e)
4476	te.streamServerInt = fullDuplexOnly
4477	te.startServer(&testServer{security: e.security})
4478	defer te.tearDown()
4479
4480	tc := testpb.NewTestServiceClient(te.clientConn())
4481	respParam := []*testpb.ResponseParameters{
4482		{
4483			Size: int32(1),
4484		},
4485	}
4486	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
4487	if err != nil {
4488		t.Fatal(err)
4489	}
4490	req := &testpb.StreamingOutputCallRequest{
4491		ResponseType:       testpb.PayloadType_COMPRESSABLE,
4492		ResponseParameters: respParam,
4493		Payload:            payload,
4494	}
4495	s1, err := tc.StreamingOutputCall(context.Background(), req)
4496	if err != nil {
4497		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, <nil>", tc, err)
4498	}
4499	if _, err := s1.Recv(); status.Code(err) != codes.PermissionDenied {
4500		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
4501	}
4502	s2, err := tc.FullDuplexCall(context.Background())
4503	if err != nil {
4504		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4505	}
4506	if err := s2.Send(req); err != nil {
4507		t.Fatalf("%v.Send(_) = %v, want <nil>", s2, err)
4508	}
4509	if _, err := s2.Recv(); err != nil {
4510		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", s2, err)
4511	}
4512}
4513
4514// funcServer implements methods of TestServiceServer using funcs,
4515// similar to an http.HandlerFunc.
4516// Any unimplemented method will crash. Tests implement the method(s)
4517// they need.
4518type funcServer struct {
4519	testpb.TestServiceServer
4520	unaryCall          func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
4521	streamingInputCall func(stream testpb.TestService_StreamingInputCallServer) error
4522	fullDuplexCall     func(stream testpb.TestService_FullDuplexCallServer) error
4523}
4524
4525func (s *funcServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4526	return s.unaryCall(ctx, in)
4527}
4528
4529func (s *funcServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
4530	return s.streamingInputCall(stream)
4531}
4532
4533func (s *funcServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
4534	return s.fullDuplexCall(stream)
4535}
4536
4537func (s) TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) {
4538	for _, e := range listTestEnv() {
4539		testClientRequestBodyErrorUnexpectedEOF(t, e)
4540	}
4541}
4542
4543func testClientRequestBodyErrorUnexpectedEOF(t *testing.T, e env) {
4544	te := newTest(t, e)
4545	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4546		errUnexpectedCall := errors.New("unexpected call func server method")
4547		t.Error(errUnexpectedCall)
4548		return nil, errUnexpectedCall
4549	}}
4550	te.startServer(ts)
4551	defer te.tearDown()
4552	te.withServerTester(func(st *serverTester) {
4553		st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4554		// Say we have 5 bytes coming, but set END_STREAM flag:
4555		st.writeData(1, true, []byte{0, 0, 0, 0, 5})
4556		st.wantAnyFrame() // wait for server to crash (it used to crash)
4557	})
4558}
4559
4560func (s) TestClientRequestBodyErrorCloseAfterLength(t *testing.T) {
4561	for _, e := range listTestEnv() {
4562		testClientRequestBodyErrorCloseAfterLength(t, e)
4563	}
4564}
4565
4566func testClientRequestBodyErrorCloseAfterLength(t *testing.T, e env) {
4567	te := newTest(t, e)
4568	te.declareLogNoise("Server.processUnaryRPC failed to write status")
4569	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4570		errUnexpectedCall := errors.New("unexpected call func server method")
4571		t.Error(errUnexpectedCall)
4572		return nil, errUnexpectedCall
4573	}}
4574	te.startServer(ts)
4575	defer te.tearDown()
4576	te.withServerTester(func(st *serverTester) {
4577		st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4578		// say we're sending 5 bytes, but then close the connection instead.
4579		st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4580		st.cc.Close()
4581	})
4582}
4583
4584func (s) TestClientRequestBodyErrorCancel(t *testing.T) {
4585	for _, e := range listTestEnv() {
4586		testClientRequestBodyErrorCancel(t, e)
4587	}
4588}
4589
4590func testClientRequestBodyErrorCancel(t *testing.T, e env) {
4591	te := newTest(t, e)
4592	gotCall := make(chan bool, 1)
4593	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
4594		gotCall <- true
4595		return new(testpb.SimpleResponse), nil
4596	}}
4597	te.startServer(ts)
4598	defer te.tearDown()
4599	te.withServerTester(func(st *serverTester) {
4600		st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall")
4601		// Say we have 5 bytes coming, but cancel it instead.
4602		st.writeRSTStream(1, http2.ErrCodeCancel)
4603		st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4604
4605		// Verify we didn't a call yet.
4606		select {
4607		case <-gotCall:
4608			t.Fatal("unexpected call")
4609		default:
4610		}
4611
4612		// And now send an uncanceled (but still invalid), just to get a response.
4613		st.writeHeadersGRPC(3, "/grpc.testing.TestService/UnaryCall")
4614		st.writeData(3, true, []byte{0, 0, 0, 0, 0})
4615		<-gotCall
4616		st.wantAnyFrame()
4617	})
4618}
4619
4620func (s) TestClientRequestBodyErrorCancelStreamingInput(t *testing.T) {
4621	for _, e := range listTestEnv() {
4622		testClientRequestBodyErrorCancelStreamingInput(t, e)
4623	}
4624}
4625
4626func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) {
4627	te := newTest(t, e)
4628	recvErr := make(chan error, 1)
4629	ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
4630		_, err := stream.Recv()
4631		recvErr <- err
4632		return nil
4633	}}
4634	te.startServer(ts)
4635	defer te.tearDown()
4636	te.withServerTester(func(st *serverTester) {
4637		st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall")
4638		// Say we have 5 bytes coming, but cancel it instead.
4639		st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4640		st.writeRSTStream(1, http2.ErrCodeCancel)
4641
4642		var got error
4643		select {
4644		case got = <-recvErr:
4645		case <-time.After(3 * time.Second):
4646			t.Fatal("timeout waiting for error")
4647		}
4648		if grpc.Code(got) != codes.Canceled {
4649			t.Errorf("error = %#v; want error code %s", got, codes.Canceled)
4650		}
4651	})
4652}
4653
4654func (s) TestClientResourceExhaustedCancelFullDuplex(t *testing.T) {
4655	for _, e := range listTestEnv() {
4656		if e.httpHandler {
4657			// httpHandler write won't be blocked on flow control window.
4658			continue
4659		}
4660		testClientResourceExhaustedCancelFullDuplex(t, e)
4661	}
4662}
4663
4664func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) {
4665	te := newTest(t, e)
4666	recvErr := make(chan error, 1)
4667	ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
4668		defer close(recvErr)
4669		_, err := stream.Recv()
4670		if err != nil {
4671			return status.Errorf(codes.Internal, "stream.Recv() got error: %v, want <nil>", err)
4672		}
4673		// create a payload that's larger than the default flow control window.
4674		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10)
4675		if err != nil {
4676			return err
4677		}
4678		resp := &testpb.StreamingOutputCallResponse{
4679			Payload: payload,
4680		}
4681		ce := make(chan error)
4682		go func() {
4683			var err error
4684			for {
4685				if err = stream.Send(resp); err != nil {
4686					break
4687				}
4688			}
4689			ce <- err
4690		}()
4691		select {
4692		case err = <-ce:
4693		case <-time.After(10 * time.Second):
4694			err = errors.New("10s timeout reached")
4695		}
4696		recvErr <- err
4697		return err
4698	}}
4699	te.startServer(ts)
4700	defer te.tearDown()
4701	// set a low limit on receive message size to error with Resource Exhausted on
4702	// client side when server send a large message.
4703	te.maxClientReceiveMsgSize = newInt(10)
4704	cc := te.clientConn()
4705	tc := testpb.NewTestServiceClient(cc)
4706	stream, err := tc.FullDuplexCall(context.Background())
4707	if err != nil {
4708		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4709	}
4710	req := &testpb.StreamingOutputCallRequest{}
4711	if err := stream.Send(req); err != nil {
4712		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
4713	}
4714	if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted {
4715		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
4716	}
4717	err = <-recvErr
4718	if status.Code(err) != codes.Canceled {
4719		t.Fatalf("server got error %v, want error code: %s", err, codes.Canceled)
4720	}
4721}
4722
4723type clientTimeoutCreds struct {
4724	timeoutReturned bool
4725}
4726
4727func (c *clientTimeoutCreds) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4728	if !c.timeoutReturned {
4729		c.timeoutReturned = true
4730		return nil, nil, context.DeadlineExceeded
4731	}
4732	return rawConn, nil, nil
4733}
4734func (c *clientTimeoutCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4735	return rawConn, nil, nil
4736}
4737func (c *clientTimeoutCreds) Info() credentials.ProtocolInfo {
4738	return credentials.ProtocolInfo{}
4739}
4740func (c *clientTimeoutCreds) Clone() credentials.TransportCredentials {
4741	return nil
4742}
4743func (c *clientTimeoutCreds) OverrideServerName(s string) error {
4744	return nil
4745}
4746
4747func (s) TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) {
4748	te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "clientTimeoutCreds", balancer: "v1"})
4749	te.userAgent = testAppUA
4750	te.startServer(&testServer{security: te.e.security})
4751	defer te.tearDown()
4752
4753	cc := te.clientConn()
4754	tc := testpb.NewTestServiceClient(cc)
4755	// This unary call should succeed, because ClientHandshake will succeed for the second time.
4756	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
4757		te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <nil>", err)
4758	}
4759}
4760
4761type serverDispatchCred struct {
4762	rawConnCh chan net.Conn
4763}
4764
4765func newServerDispatchCred() *serverDispatchCred {
4766	return &serverDispatchCred{
4767		rawConnCh: make(chan net.Conn, 1),
4768	}
4769}
4770func (c *serverDispatchCred) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4771	return rawConn, nil, nil
4772}
4773func (c *serverDispatchCred) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4774	select {
4775	case c.rawConnCh <- rawConn:
4776	default:
4777	}
4778	return nil, nil, credentials.ErrConnDispatched
4779}
4780func (c *serverDispatchCred) Info() credentials.ProtocolInfo {
4781	return credentials.ProtocolInfo{}
4782}
4783func (c *serverDispatchCred) Clone() credentials.TransportCredentials {
4784	return nil
4785}
4786func (c *serverDispatchCred) OverrideServerName(s string) error {
4787	return nil
4788}
4789func (c *serverDispatchCred) getRawConn() net.Conn {
4790	return <-c.rawConnCh
4791}
4792
4793func (s) TestServerCredsDispatch(t *testing.T) {
4794	lis, err := net.Listen("tcp", "localhost:0")
4795	if err != nil {
4796		t.Fatalf("Failed to listen: %v", err)
4797	}
4798	cred := newServerDispatchCred()
4799	s := grpc.NewServer(grpc.Creds(cred))
4800	go s.Serve(lis)
4801	defer s.Stop()
4802
4803	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(cred))
4804	if err != nil {
4805		t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4806	}
4807	defer cc.Close()
4808
4809	rawConn := cred.getRawConn()
4810	// Give grpc a chance to see the error and potentially close the connection.
4811	// And check that connection is not closed after that.
4812	time.Sleep(100 * time.Millisecond)
4813	// Check rawConn is not closed.
4814	if n, err := rawConn.Write([]byte{0}); n <= 0 || err != nil {
4815		t.Errorf("Read() = %v, %v; want n>0, <nil>", n, err)
4816	}
4817}
4818
4819type authorityCheckCreds struct {
4820	got string
4821}
4822
4823func (c *authorityCheckCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4824	return rawConn, nil, nil
4825}
4826func (c *authorityCheckCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4827	c.got = authority
4828	return rawConn, nil, nil
4829}
4830func (c *authorityCheckCreds) Info() credentials.ProtocolInfo {
4831	return credentials.ProtocolInfo{}
4832}
4833func (c *authorityCheckCreds) Clone() credentials.TransportCredentials {
4834	return c
4835}
4836func (c *authorityCheckCreds) OverrideServerName(s string) error {
4837	return nil
4838}
4839
4840// This test makes sure that the authority client handshake gets is the endpoint
4841// in dial target, not the resolved ip address.
4842func (s) TestCredsHandshakeAuthority(t *testing.T) {
4843	const testAuthority = "test.auth.ori.ty"
4844
4845	lis, err := net.Listen("tcp", "localhost:0")
4846	if err != nil {
4847		t.Fatalf("Failed to listen: %v", err)
4848	}
4849	cred := &authorityCheckCreds{}
4850	s := grpc.NewServer()
4851	go s.Serve(lis)
4852	defer s.Stop()
4853
4854	r, rcleanup := manual.GenerateAndRegisterManualResolver()
4855	defer rcleanup()
4856
4857	cc, err := grpc.Dial(r.Scheme()+":///"+testAuthority, grpc.WithTransportCredentials(cred))
4858	if err != nil {
4859		t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4860	}
4861	defer cc.Close()
4862	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
4863
4864	ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
4865	defer cancel()
4866	for {
4867		s := cc.GetState()
4868		if s == connectivity.Ready {
4869			break
4870		}
4871		if !cc.WaitForStateChange(ctx, s) {
4872			// ctx got timeout or canceled.
4873			t.Fatalf("ClientConn is not ready after 100 ms")
4874		}
4875	}
4876
4877	if cred.got != testAuthority {
4878		t.Fatalf("client creds got authority: %q, want: %q", cred.got, testAuthority)
4879	}
4880}
4881
4882type clientFailCreds struct{}
4883
4884func (c *clientFailCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4885	return rawConn, nil, nil
4886}
4887func (c *clientFailCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4888	return nil, nil, fmt.Errorf("client handshake fails with fatal error")
4889}
4890func (c *clientFailCreds) Info() credentials.ProtocolInfo {
4891	return credentials.ProtocolInfo{}
4892}
4893func (c *clientFailCreds) Clone() credentials.TransportCredentials {
4894	return c
4895}
4896func (c *clientFailCreds) OverrideServerName(s string) error {
4897	return nil
4898}
4899
4900// This test makes sure that failfast RPCs fail if client handshake fails with
4901// fatal errors.
4902func (s) TestFailfastRPCFailOnFatalHandshakeError(t *testing.T) {
4903	lis, err := net.Listen("tcp", "localhost:0")
4904	if err != nil {
4905		t.Fatalf("Failed to listen: %v", err)
4906	}
4907	defer lis.Close()
4908
4909	cc, err := grpc.Dial("passthrough:///"+lis.Addr().String(), grpc.WithTransportCredentials(&clientFailCreds{}))
4910	if err != nil {
4911		t.Fatalf("grpc.Dial(_) = %v", err)
4912	}
4913	defer cc.Close()
4914
4915	tc := testpb.NewTestServiceClient(cc)
4916	// This unary call should fail, but not timeout.
4917	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
4918	defer cancel()
4919	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(false)); status.Code(err) != codes.Unavailable {
4920		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <Unavailable>", err)
4921	}
4922}
4923
4924func (s) TestFlowControlLogicalRace(t *testing.T) {
4925	// Test for a regression of https://github.com/grpc/grpc-go/issues/632,
4926	// and other flow control bugs.
4927
4928	const (
4929		itemCount   = 100
4930		itemSize    = 1 << 10
4931		recvCount   = 2
4932		maxFailures = 3
4933
4934		requestTimeout = time.Second * 5
4935	)
4936
4937	requestCount := 10000
4938	if raceMode {
4939		requestCount = 1000
4940	}
4941
4942	lis, err := net.Listen("tcp", "localhost:0")
4943	if err != nil {
4944		t.Fatalf("Failed to listen: %v", err)
4945	}
4946	defer lis.Close()
4947
4948	s := grpc.NewServer()
4949	testpb.RegisterTestServiceServer(s, &flowControlLogicalRaceServer{
4950		itemCount: itemCount,
4951		itemSize:  itemSize,
4952	})
4953	defer s.Stop()
4954
4955	go s.Serve(lis)
4956
4957	ctx := context.Background()
4958
4959	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
4960	if err != nil {
4961		t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4962	}
4963	defer cc.Close()
4964	cl := testpb.NewTestServiceClient(cc)
4965
4966	failures := 0
4967	for i := 0; i < requestCount; i++ {
4968		ctx, cancel := context.WithTimeout(ctx, requestTimeout)
4969		output, err := cl.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
4970		if err != nil {
4971			t.Fatalf("StreamingOutputCall; err = %q", err)
4972		}
4973
4974		j := 0
4975	loop:
4976		for ; j < recvCount; j++ {
4977			_, err := output.Recv()
4978			if err != nil {
4979				if err == io.EOF {
4980					break loop
4981				}
4982				switch status.Code(err) {
4983				case codes.DeadlineExceeded:
4984					break loop
4985				default:
4986					t.Fatalf("Recv; err = %q", err)
4987				}
4988			}
4989		}
4990		cancel()
4991		<-ctx.Done()
4992
4993		if j < recvCount {
4994			t.Errorf("got %d responses to request %d", j, i)
4995			failures++
4996			if failures >= maxFailures {
4997				// Continue past the first failure to see if the connection is
4998				// entirely broken, or if only a single RPC was affected
4999				break
5000			}
5001		}
5002	}
5003}
5004
5005type flowControlLogicalRaceServer struct {
5006	testpb.TestServiceServer
5007
5008	itemSize  int
5009	itemCount int
5010}
5011
5012func (s *flowControlLogicalRaceServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, srv testpb.TestService_StreamingOutputCallServer) error {
5013	for i := 0; i < s.itemCount; i++ {
5014		err := srv.Send(&testpb.StreamingOutputCallResponse{
5015			Payload: &testpb.Payload{
5016				// Sending a large stream of data which the client reject
5017				// helps to trigger some types of flow control bugs.
5018				//
5019				// Reallocating memory here is inefficient, but the stress it
5020				// puts on the GC leads to more frequent flow control
5021				// failures. The GC likely causes more variety in the
5022				// goroutine scheduling orders.
5023				Body: bytes.Repeat([]byte("a"), s.itemSize),
5024			},
5025		})
5026		if err != nil {
5027			return err
5028		}
5029	}
5030	return nil
5031}
5032
5033type lockingWriter struct {
5034	mu sync.Mutex
5035	w  io.Writer
5036}
5037
5038func (lw *lockingWriter) Write(p []byte) (n int, err error) {
5039	lw.mu.Lock()
5040	defer lw.mu.Unlock()
5041	return lw.w.Write(p)
5042}
5043
5044func (lw *lockingWriter) setWriter(w io.Writer) {
5045	lw.mu.Lock()
5046	defer lw.mu.Unlock()
5047	lw.w = w
5048}
5049
5050var testLogOutput = &lockingWriter{w: os.Stderr}
5051
5052// awaitNewConnLogOutput waits for any of grpc.NewConn's goroutines to
5053// terminate, if they're still running. It spams logs with this
5054// message.  We wait for it so our log filter is still
5055// active. Otherwise the "defer restore()" at the top of various test
5056// functions restores our log filter and then the goroutine spams.
5057func awaitNewConnLogOutput() {
5058	awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry")
5059}
5060
5061func awaitLogOutput(maxWait time.Duration, phrase string) {
5062	pb := []byte(phrase)
5063
5064	timer := time.NewTimer(maxWait)
5065	defer timer.Stop()
5066	wakeup := make(chan bool, 1)
5067	for {
5068		if logOutputHasContents(pb, wakeup) {
5069			return
5070		}
5071		select {
5072		case <-timer.C:
5073			// Too slow. Oh well.
5074			return
5075		case <-wakeup:
5076		}
5077	}
5078}
5079
5080func logOutputHasContents(v []byte, wakeup chan<- bool) bool {
5081	testLogOutput.mu.Lock()
5082	defer testLogOutput.mu.Unlock()
5083	fw, ok := testLogOutput.w.(*filterWriter)
5084	if !ok {
5085		return false
5086	}
5087	fw.mu.Lock()
5088	defer fw.mu.Unlock()
5089	if bytes.Contains(fw.buf.Bytes(), v) {
5090		return true
5091	}
5092	fw.wakeup = wakeup
5093	return false
5094}
5095
5096var verboseLogs = flag.Bool("verbose_logs", false, "show all grpclog output, without filtering")
5097
5098func noop() {}
5099
5100// declareLogNoise declares that t is expected to emit the following noisy phrases,
5101// even on success. Those phrases will be filtered from grpclog output
5102// and only be shown if *verbose_logs or t ends up failing.
5103// The returned restore function should be called with defer to be run
5104// before the test ends.
5105func declareLogNoise(t *testing.T, phrases ...string) (restore func()) {
5106	if *verboseLogs {
5107		return noop
5108	}
5109	fw := &filterWriter{dst: os.Stderr, filter: phrases}
5110	testLogOutput.setWriter(fw)
5111	return func() {
5112		if t.Failed() {
5113			fw.mu.Lock()
5114			defer fw.mu.Unlock()
5115			if fw.buf.Len() > 0 {
5116				t.Logf("Complete log output:\n%s", fw.buf.Bytes())
5117			}
5118		}
5119		testLogOutput.setWriter(os.Stderr)
5120	}
5121}
5122
5123type filterWriter struct {
5124	dst    io.Writer
5125	filter []string
5126
5127	mu     sync.Mutex
5128	buf    bytes.Buffer
5129	wakeup chan<- bool // if non-nil, gets true on write
5130}
5131
5132func (fw *filterWriter) Write(p []byte) (n int, err error) {
5133	fw.mu.Lock()
5134	fw.buf.Write(p)
5135	if fw.wakeup != nil {
5136		select {
5137		case fw.wakeup <- true:
5138		default:
5139		}
5140	}
5141	fw.mu.Unlock()
5142
5143	ps := string(p)
5144	for _, f := range fw.filter {
5145		if strings.Contains(ps, f) {
5146			return len(p), nil
5147		}
5148	}
5149	return fw.dst.Write(p)
5150}
5151
5152// stubServer is a server that is easy to customize within individual test
5153// cases.
5154type stubServer struct {
5155	// Guarantees we satisfy this interface; panics if unimplemented methods are called.
5156	testpb.TestServiceServer
5157
5158	// Customizable implementations of server handlers.
5159	emptyCall      func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error)
5160	unaryCall      func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
5161	fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error
5162
5163	// A client connected to this service the test may use.  Created in Start().
5164	client testpb.TestServiceClient
5165	cc     *grpc.ClientConn
5166
5167	addr string // address of listener
5168
5169	cleanups []func() // Lambdas executed in Stop(); populated by Start().
5170
5171	r *manual.Resolver
5172}
5173
5174func (ss *stubServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5175	return ss.emptyCall(ctx, in)
5176}
5177
5178func (ss *stubServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
5179	return ss.unaryCall(ctx, in)
5180}
5181
5182func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
5183	return ss.fullDuplexCall(stream)
5184}
5185
5186// Start starts the server and creates a client connected to it.
5187func (ss *stubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption) error {
5188	r, cleanup := manual.GenerateAndRegisterManualResolver()
5189	ss.r = r
5190	ss.cleanups = append(ss.cleanups, cleanup)
5191
5192	lis, err := net.Listen("tcp", "localhost:0")
5193	if err != nil {
5194		return fmt.Errorf(`net.Listen("tcp", "localhost:0") = %v`, err)
5195	}
5196	ss.addr = lis.Addr().String()
5197	ss.cleanups = append(ss.cleanups, func() { lis.Close() })
5198
5199	s := grpc.NewServer(sopts...)
5200	testpb.RegisterTestServiceServer(s, ss)
5201	go s.Serve(lis)
5202	ss.cleanups = append(ss.cleanups, s.Stop)
5203
5204	target := ss.r.Scheme() + ":///" + ss.addr
5205
5206	opts := append([]grpc.DialOption{grpc.WithInsecure()}, dopts...)
5207	cc, err := grpc.Dial(target, opts...)
5208	if err != nil {
5209		return fmt.Errorf("grpc.Dial(%q) = %v", target, err)
5210	}
5211	ss.cc = cc
5212	ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}})
5213	if err := ss.waitForReady(cc); err != nil {
5214		return err
5215	}
5216
5217	ss.cleanups = append(ss.cleanups, func() { cc.Close() })
5218
5219	ss.client = testpb.NewTestServiceClient(cc)
5220	return nil
5221}
5222
5223func (ss *stubServer) newServiceConfig(sc string) {
5224	ss.r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: ss.addr}}, ServiceConfig: sc})
5225}
5226
5227func (ss *stubServer) waitForReady(cc *grpc.ClientConn) error {
5228	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
5229	defer cancel()
5230	for {
5231		s := cc.GetState()
5232		if s == connectivity.Ready {
5233			return nil
5234		}
5235		if !cc.WaitForStateChange(ctx, s) {
5236			// ctx got timeout or canceled.
5237			return ctx.Err()
5238		}
5239	}
5240}
5241
5242func (ss *stubServer) Stop() {
5243	for i := len(ss.cleanups) - 1; i >= 0; i-- {
5244		ss.cleanups[i]()
5245	}
5246}
5247
5248func (s) TestGRPCMethod(t *testing.T) {
5249	var method string
5250	var ok bool
5251
5252	ss := &stubServer{
5253		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5254			method, ok = grpc.Method(ctx)
5255			return &testpb.Empty{}, nil
5256		},
5257	}
5258	if err := ss.Start(nil); err != nil {
5259		t.Fatalf("Error starting endpoint server: %v", err)
5260	}
5261	defer ss.Stop()
5262
5263	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5264	defer cancel()
5265
5266	if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
5267		t.Fatalf("ss.client.EmptyCall(_, _) = _, %v; want _, nil", err)
5268	}
5269
5270	if want := "/grpc.testing.TestService/EmptyCall"; !ok || method != want {
5271		t.Fatalf("grpc.Method(_) = %q, %v; want %q, true", method, ok, want)
5272	}
5273}
5274
5275func (s) TestUnaryProxyDoesNotForwardMetadata(t *testing.T) {
5276	const mdkey = "somedata"
5277
5278	// endpoint ensures mdkey is NOT in metadata and returns an error if it is.
5279	endpoint := &stubServer{
5280		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5281			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
5282				return nil, status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
5283			}
5284			return &testpb.Empty{}, nil
5285		},
5286	}
5287	if err := endpoint.Start(nil); err != nil {
5288		t.Fatalf("Error starting endpoint server: %v", err)
5289	}
5290	defer endpoint.Stop()
5291
5292	// proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint
5293	// without explicitly copying the metadata.
5294	proxy := &stubServer{
5295		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5296			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
5297				return nil, status.Errorf(codes.Internal, "proxy: md=%v; want contains(%q)", md, mdkey)
5298			}
5299			return endpoint.client.EmptyCall(ctx, in)
5300		},
5301	}
5302	if err := proxy.Start(nil); err != nil {
5303		t.Fatalf("Error starting proxy server: %v", err)
5304	}
5305	defer proxy.Stop()
5306
5307	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5308	defer cancel()
5309	md := metadata.Pairs(mdkey, "val")
5310	ctx = metadata.NewOutgoingContext(ctx, md)
5311
5312	// Sanity check that endpoint properly errors when it sees mdkey.
5313	_, err := endpoint.client.EmptyCall(ctx, &testpb.Empty{})
5314	if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
5315		t.Fatalf("endpoint.client.EmptyCall(_, _) = _, %v; want _, <status with Code()=Internal>", err)
5316	}
5317
5318	if _, err := proxy.client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
5319		t.Fatal(err.Error())
5320	}
5321}
5322
5323func (s) TestStreamingProxyDoesNotForwardMetadata(t *testing.T) {
5324	const mdkey = "somedata"
5325
5326	// doFDC performs a FullDuplexCall with client and returns the error from the
5327	// first stream.Recv call, or nil if that error is io.EOF.  Calls t.Fatal if
5328	// the stream cannot be established.
5329	doFDC := func(ctx context.Context, client testpb.TestServiceClient) error {
5330		stream, err := client.FullDuplexCall(ctx)
5331		if err != nil {
5332			t.Fatalf("Unwanted error: %v", err)
5333		}
5334		if _, err := stream.Recv(); err != io.EOF {
5335			return err
5336		}
5337		return nil
5338	}
5339
5340	// endpoint ensures mdkey is NOT in metadata and returns an error if it is.
5341	endpoint := &stubServer{
5342		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
5343			ctx := stream.Context()
5344			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
5345				return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
5346			}
5347			return nil
5348		},
5349	}
5350	if err := endpoint.Start(nil); err != nil {
5351		t.Fatalf("Error starting endpoint server: %v", err)
5352	}
5353	defer endpoint.Stop()
5354
5355	// proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint
5356	// without explicitly copying the metadata.
5357	proxy := &stubServer{
5358		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
5359			ctx := stream.Context()
5360			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
5361				return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
5362			}
5363			return doFDC(ctx, endpoint.client)
5364		},
5365	}
5366	if err := proxy.Start(nil); err != nil {
5367		t.Fatalf("Error starting proxy server: %v", err)
5368	}
5369	defer proxy.Stop()
5370
5371	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5372	defer cancel()
5373	md := metadata.Pairs(mdkey, "val")
5374	ctx = metadata.NewOutgoingContext(ctx, md)
5375
5376	// Sanity check that endpoint properly errors when it sees mdkey in ctx.
5377	err := doFDC(ctx, endpoint.client)
5378	if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
5379		t.Fatalf("stream.Recv() = _, %v; want _, <status with Code()=Internal>", err)
5380	}
5381
5382	if err := doFDC(ctx, proxy.client); err != nil {
5383		t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err)
5384	}
5385}
5386
5387func (s) TestStatsTagsAndTrace(t *testing.T) {
5388	// Data added to context by client (typically in a stats handler).
5389	tags := []byte{1, 5, 2, 4, 3}
5390	trace := []byte{5, 2, 1, 3, 4}
5391
5392	// endpoint ensures Tags() and Trace() in context match those that were added
5393	// by the client and returns an error if not.
5394	endpoint := &stubServer{
5395		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5396			md, _ := metadata.FromIncomingContext(ctx)
5397			if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) {
5398				return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags)
5399			}
5400			if !reflect.DeepEqual(md["grpc-tags-bin"], []string{string(tags)}) {
5401				return nil, status.Errorf(codes.Internal, "md['grpc-tags-bin']=%v; want %v", md["grpc-tags-bin"], tags)
5402			}
5403			if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, trace) {
5404				return nil, status.Errorf(codes.Internal, "stats.Trace(%v)=%v; want %v", ctx, tr, trace)
5405			}
5406			if !reflect.DeepEqual(md["grpc-trace-bin"], []string{string(trace)}) {
5407				return nil, status.Errorf(codes.Internal, "md['grpc-trace-bin']=%v; want %v", md["grpc-trace-bin"], trace)
5408			}
5409			return &testpb.Empty{}, nil
5410		},
5411	}
5412	if err := endpoint.Start(nil); err != nil {
5413		t.Fatalf("Error starting endpoint server: %v", err)
5414	}
5415	defer endpoint.Stop()
5416
5417	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5418	defer cancel()
5419
5420	testCases := []struct {
5421		ctx  context.Context
5422		want codes.Code
5423	}{
5424		{ctx: ctx, want: codes.Internal},
5425		{ctx: stats.SetTags(ctx, tags), want: codes.Internal},
5426		{ctx: stats.SetTrace(ctx, trace), want: codes.Internal},
5427		{ctx: stats.SetTags(stats.SetTrace(ctx, tags), tags), want: codes.Internal},
5428		{ctx: stats.SetTags(stats.SetTrace(ctx, trace), tags), want: codes.OK},
5429	}
5430
5431	for _, tc := range testCases {
5432		_, err := endpoint.client.EmptyCall(tc.ctx, &testpb.Empty{})
5433		if tc.want == codes.OK && err != nil {
5434			t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err)
5435		}
5436		if s, ok := status.FromError(err); !ok || s.Code() != tc.want {
5437			t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, <status with Code()=%v>", tc.ctx, err, tc.want)
5438		}
5439	}
5440}
5441
5442func (s) TestTapTimeout(t *testing.T) {
5443	sopts := []grpc.ServerOption{
5444		grpc.InTapHandle(func(ctx context.Context, _ *tap.Info) (context.Context, error) {
5445			c, cancel := context.WithCancel(ctx)
5446			// Call cancel instead of setting a deadline so we can detect which error
5447			// occurred -- this cancellation (desired) or the client's deadline
5448			// expired (indicating this cancellation did not affect the RPC).
5449			time.AfterFunc(10*time.Millisecond, cancel)
5450			return c, nil
5451		}),
5452	}
5453
5454	ss := &stubServer{
5455		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5456			<-ctx.Done()
5457			return nil, status.Errorf(codes.Canceled, ctx.Err().Error())
5458		},
5459	}
5460	if err := ss.Start(sopts); err != nil {
5461		t.Fatalf("Error starting endpoint server: %v", err)
5462	}
5463	defer ss.Stop()
5464
5465	// This was known to be flaky; test several times.
5466	for i := 0; i < 10; i++ {
5467		// Set our own deadline in case the server hangs.
5468		ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
5469		res, err := ss.client.EmptyCall(ctx, &testpb.Empty{})
5470		cancel()
5471		if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled {
5472			t.Fatalf("ss.client.EmptyCall(context.Background(), _) = %v, %v; want nil, <status with Code()=Canceled>", res, err)
5473		}
5474	}
5475
5476}
5477
5478func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) {
5479	ss := &stubServer{
5480		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
5481			return status.Errorf(codes.Internal, "")
5482		},
5483	}
5484	sopts := []grpc.ServerOption{}
5485	if err := ss.Start(sopts); err != nil {
5486		t.Fatalf("Error starting endpoint server: %v", err)
5487	}
5488	defer ss.Stop()
5489	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
5490	defer cancel()
5491	stream, err := ss.client.FullDuplexCall(ctx)
5492	if err != nil {
5493		t.Fatalf("Error while creating stream: %v", err)
5494	}
5495	for {
5496		if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err == nil {
5497			time.Sleep(5 * time.Millisecond)
5498		} else if err == io.EOF {
5499			break // Success.
5500		} else {
5501			t.Fatalf("stream.Send(_) = %v, want io.EOF", err)
5502		}
5503	}
5504}
5505
5506type windowSizeConfig struct {
5507	serverStream int32
5508	serverConn   int32
5509	clientStream int32
5510	clientConn   int32
5511}
5512
5513func max(a, b int32) int32 {
5514	if a > b {
5515		return a
5516	}
5517	return b
5518}
5519
5520func (s) TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
5521	wc := windowSizeConfig{
5522		serverStream: 8 * 1024 * 1024,
5523		serverConn:   12 * 1024 * 1024,
5524		clientStream: 6 * 1024 * 1024,
5525		clientConn:   8 * 1024 * 1024,
5526	}
5527	for _, e := range listTestEnv() {
5528		testConfigurableWindowSize(t, e, wc)
5529	}
5530}
5531
5532func (s) TestConfigurableWindowSizeWithSmallWindow(t *testing.T) {
5533	wc := windowSizeConfig{
5534		serverStream: 1,
5535		serverConn:   1,
5536		clientStream: 1,
5537		clientConn:   1,
5538	}
5539	for _, e := range listTestEnv() {
5540		testConfigurableWindowSize(t, e, wc)
5541	}
5542}
5543
5544func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
5545	te := newTest(t, e)
5546	te.serverInitialWindowSize = wc.serverStream
5547	te.serverInitialConnWindowSize = wc.serverConn
5548	te.clientInitialWindowSize = wc.clientStream
5549	te.clientInitialConnWindowSize = wc.clientConn
5550
5551	te.startServer(&testServer{security: e.security})
5552	defer te.tearDown()
5553
5554	cc := te.clientConn()
5555	tc := testpb.NewTestServiceClient(cc)
5556	stream, err := tc.FullDuplexCall(context.Background())
5557	if err != nil {
5558		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5559	}
5560	numOfIter := 11
5561	// Set message size to exhaust largest of window sizes.
5562	messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1)
5563	messageSize = max(messageSize, 64*1024)
5564	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize)
5565	if err != nil {
5566		t.Fatal(err)
5567	}
5568	respParams := []*testpb.ResponseParameters{
5569		{
5570			Size: messageSize,
5571		},
5572	}
5573	req := &testpb.StreamingOutputCallRequest{
5574		ResponseType:       testpb.PayloadType_COMPRESSABLE,
5575		ResponseParameters: respParams,
5576		Payload:            payload,
5577	}
5578	for i := 0; i < numOfIter; i++ {
5579		if err := stream.Send(req); err != nil {
5580			t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
5581		}
5582		if _, err := stream.Recv(); err != nil {
5583			t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
5584		}
5585	}
5586	if err := stream.CloseSend(); err != nil {
5587		t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
5588	}
5589}
5590
5591var (
5592	// test authdata
5593	authdata = map[string]string{
5594		"test-key":      "test-value",
5595		"test-key2-bin": string([]byte{1, 2, 3}),
5596	}
5597)
5598
5599type testPerRPCCredentials struct{}
5600
5601func (cr testPerRPCCredentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
5602	return authdata, nil
5603}
5604
5605func (cr testPerRPCCredentials) RequireTransportSecurity() bool {
5606	return false
5607}
5608
5609func authHandle(ctx context.Context, info *tap.Info) (context.Context, error) {
5610	md, ok := metadata.FromIncomingContext(ctx)
5611	if !ok {
5612		return ctx, fmt.Errorf("didn't find metadata in context")
5613	}
5614	for k, vwant := range authdata {
5615		vgot, ok := md[k]
5616		if !ok {
5617			return ctx, fmt.Errorf("didn't find authdata key %v in context", k)
5618		}
5619		if vgot[0] != vwant {
5620			return ctx, fmt.Errorf("for key %v, got value %v, want %v", k, vgot, vwant)
5621		}
5622	}
5623	return ctx, nil
5624}
5625
5626func (s) TestPerRPCCredentialsViaDialOptions(t *testing.T) {
5627	for _, e := range listTestEnv() {
5628		testPerRPCCredentialsViaDialOptions(t, e)
5629	}
5630}
5631
5632func testPerRPCCredentialsViaDialOptions(t *testing.T, e env) {
5633	te := newTest(t, e)
5634	te.tapHandle = authHandle
5635	te.perRPCCreds = testPerRPCCredentials{}
5636	te.startServer(&testServer{security: e.security})
5637	defer te.tearDown()
5638
5639	cc := te.clientConn()
5640	tc := testpb.NewTestServiceClient(cc)
5641	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
5642		t.Fatalf("Test failed. Reason: %v", err)
5643	}
5644}
5645
5646func (s) TestPerRPCCredentialsViaCallOptions(t *testing.T) {
5647	for _, e := range listTestEnv() {
5648		testPerRPCCredentialsViaCallOptions(t, e)
5649	}
5650}
5651
5652func testPerRPCCredentialsViaCallOptions(t *testing.T, e env) {
5653	te := newTest(t, e)
5654	te.tapHandle = authHandle
5655	te.startServer(&testServer{security: e.security})
5656	defer te.tearDown()
5657
5658	cc := te.clientConn()
5659	tc := testpb.NewTestServiceClient(cc)
5660	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil {
5661		t.Fatalf("Test failed. Reason: %v", err)
5662	}
5663}
5664
5665func (s) TestPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T) {
5666	for _, e := range listTestEnv() {
5667		testPerRPCCredentialsViaDialOptionsAndCallOptions(t, e)
5668	}
5669}
5670
5671func testPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T, e env) {
5672	te := newTest(t, e)
5673	te.perRPCCreds = testPerRPCCredentials{}
5674	// When credentials are provided via both dial options and call options,
5675	// we apply both sets.
5676	te.tapHandle = func(ctx context.Context, _ *tap.Info) (context.Context, error) {
5677		md, ok := metadata.FromIncomingContext(ctx)
5678		if !ok {
5679			return ctx, fmt.Errorf("couldn't find metadata in context")
5680		}
5681		for k, vwant := range authdata {
5682			vgot, ok := md[k]
5683			if !ok {
5684				return ctx, fmt.Errorf("couldn't find metadata for key %v", k)
5685			}
5686			if len(vgot) != 2 {
5687				return ctx, fmt.Errorf("len of value for key %v was %v, want 2", k, len(vgot))
5688			}
5689			if vgot[0] != vwant || vgot[1] != vwant {
5690				return ctx, fmt.Errorf("value for %v was %v, want [%v, %v]", k, vgot, vwant, vwant)
5691			}
5692		}
5693		return ctx, nil
5694	}
5695	te.startServer(&testServer{security: e.security})
5696	defer te.tearDown()
5697
5698	cc := te.clientConn()
5699	tc := testpb.NewTestServiceClient(cc)
5700	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.PerRPCCredentials(testPerRPCCredentials{})); err != nil {
5701		t.Fatalf("Test failed. Reason: %v", err)
5702	}
5703}
5704
5705func (s) TestWaitForReadyConnection(t *testing.T) {
5706	for _, e := range listTestEnv() {
5707		testWaitForReadyConnection(t, e)
5708	}
5709
5710}
5711
5712func testWaitForReadyConnection(t *testing.T, e env) {
5713	te := newTest(t, e)
5714	te.userAgent = testAppUA
5715	te.startServer(&testServer{security: e.security})
5716	defer te.tearDown()
5717
5718	cc := te.clientConn() // Non-blocking dial.
5719	tc := testpb.NewTestServiceClient(cc)
5720	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
5721	defer cancel()
5722	state := cc.GetState()
5723	// Wait for connection to be Ready.
5724	for ; state != connectivity.Ready && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
5725	}
5726	if state != connectivity.Ready {
5727		t.Fatalf("Want connection state to be Ready, got %v", state)
5728	}
5729	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
5730	defer cancel()
5731	// Make a fail-fast RPC.
5732	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
5733		t.Fatalf("TestService/EmptyCall(_,_) = _, %v, want _, nil", err)
5734	}
5735}
5736
5737type errCodec struct {
5738	noError bool
5739}
5740
5741func (c *errCodec) Marshal(v interface{}) ([]byte, error) {
5742	if c.noError {
5743		return []byte{}, nil
5744	}
5745	return nil, fmt.Errorf("3987^12 + 4365^12 = 4472^12")
5746}
5747
5748func (c *errCodec) Unmarshal(data []byte, v interface{}) error {
5749	return nil
5750}
5751
5752func (c *errCodec) Name() string {
5753	return "Fermat's near-miss."
5754}
5755
5756func (s) TestEncodeDoesntPanic(t *testing.T) {
5757	for _, e := range listTestEnv() {
5758		testEncodeDoesntPanic(t, e)
5759	}
5760}
5761
5762func testEncodeDoesntPanic(t *testing.T, e env) {
5763	te := newTest(t, e)
5764	erc := &errCodec{}
5765	te.customCodec = erc
5766	te.startServer(&testServer{security: e.security})
5767	defer te.tearDown()
5768	te.customCodec = nil
5769	tc := testpb.NewTestServiceClient(te.clientConn())
5770	// Failure case, should not panic.
5771	tc.EmptyCall(context.Background(), &testpb.Empty{})
5772	erc.noError = true
5773	// Passing case.
5774	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
5775		t.Fatalf("EmptyCall(_, _) = _, %v, want _, <nil>", err)
5776	}
5777}
5778
5779func (s) TestSvrWriteStatusEarlyWrite(t *testing.T) {
5780	for _, e := range listTestEnv() {
5781		testSvrWriteStatusEarlyWrite(t, e)
5782	}
5783}
5784
5785func testSvrWriteStatusEarlyWrite(t *testing.T, e env) {
5786	te := newTest(t, e)
5787	const smallSize = 1024
5788	const largeSize = 2048
5789	const extraLargeSize = 4096
5790	te.maxServerReceiveMsgSize = newInt(largeSize)
5791	te.maxServerSendMsgSize = newInt(largeSize)
5792	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
5793	if err != nil {
5794		t.Fatal(err)
5795	}
5796	extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
5797	if err != nil {
5798		t.Fatal(err)
5799	}
5800	te.startServer(&testServer{security: e.security})
5801	defer te.tearDown()
5802	tc := testpb.NewTestServiceClient(te.clientConn())
5803	respParam := []*testpb.ResponseParameters{
5804		{
5805			Size: int32(smallSize),
5806		},
5807	}
5808	sreq := &testpb.StreamingOutputCallRequest{
5809		ResponseType:       testpb.PayloadType_COMPRESSABLE,
5810		ResponseParameters: respParam,
5811		Payload:            extraLargePayload,
5812	}
5813	// Test recv case: server receives a message larger than maxServerReceiveMsgSize.
5814	stream, err := tc.FullDuplexCall(te.ctx)
5815	if err != nil {
5816		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5817	}
5818	if err = stream.Send(sreq); err != nil {
5819		t.Fatalf("%v.Send() = _, %v, want <nil>", stream, err)
5820	}
5821	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
5822		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5823	}
5824	// Test send case: server sends a message larger than maxServerSendMsgSize.
5825	sreq.Payload = smallPayload
5826	respParam[0].Size = int32(extraLargeSize)
5827
5828	stream, err = tc.FullDuplexCall(te.ctx)
5829	if err != nil {
5830		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
5831	}
5832	if err = stream.Send(sreq); err != nil {
5833		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
5834	}
5835	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
5836		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
5837	}
5838}
5839
5840// The following functions with function name ending with TD indicates that they
5841// should be deleted after old service config API is deprecated and deleted.
5842func testServiceConfigSetupTD(t *testing.T, e env) (*test, chan grpc.ServiceConfig) {
5843	te := newTest(t, e)
5844	// We write before read.
5845	ch := make(chan grpc.ServiceConfig, 1)
5846	te.sc = ch
5847	te.userAgent = testAppUA
5848	te.declareLogNoise(
5849		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
5850		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
5851		"grpc: addrConn.resetTransport failed to create client transport: connection error",
5852		"Failed to dial : context canceled; please retry.",
5853	)
5854	return te, ch
5855}
5856
5857func (s) TestServiceConfigGetMethodConfigTD(t *testing.T) {
5858	for _, e := range listTestEnv() {
5859		testGetMethodConfigTD(t, e)
5860	}
5861}
5862
5863func testGetMethodConfigTD(t *testing.T, e env) {
5864	te, ch := testServiceConfigSetupTD(t, e)
5865	defer te.tearDown()
5866
5867	mc1 := grpc.MethodConfig{
5868		WaitForReady: newBool(true),
5869		Timeout:      newDuration(time.Millisecond),
5870	}
5871	mc2 := grpc.MethodConfig{WaitForReady: newBool(false)}
5872	m := make(map[string]grpc.MethodConfig)
5873	m["/grpc.testing.TestService/EmptyCall"] = mc1
5874	m["/grpc.testing.TestService/"] = mc2
5875	sc := grpc.ServiceConfig{
5876		Methods: m,
5877	}
5878	ch <- sc
5879
5880	cc := te.clientConn()
5881	tc := testpb.NewTestServiceClient(cc)
5882	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5883	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
5884		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5885	}
5886
5887	m = make(map[string]grpc.MethodConfig)
5888	m["/grpc.testing.TestService/UnaryCall"] = mc1
5889	m["/grpc.testing.TestService/"] = mc2
5890	sc = grpc.ServiceConfig{
5891		Methods: m,
5892	}
5893	ch <- sc
5894	// Wait for the new service config to propagate.
5895	for {
5896		if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded {
5897			continue
5898		}
5899		break
5900	}
5901	// The following RPCs are expected to become fail-fast.
5902	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.Unavailable {
5903		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
5904	}
5905}
5906
5907func (s) TestServiceConfigWaitForReadyTD(t *testing.T) {
5908	for _, e := range listTestEnv() {
5909		testServiceConfigWaitForReadyTD(t, e)
5910	}
5911}
5912
5913func testServiceConfigWaitForReadyTD(t *testing.T, e env) {
5914	te, ch := testServiceConfigSetupTD(t, e)
5915	defer te.tearDown()
5916
5917	// Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
5918	mc := grpc.MethodConfig{
5919		WaitForReady: newBool(false),
5920		Timeout:      newDuration(time.Millisecond),
5921	}
5922	m := make(map[string]grpc.MethodConfig)
5923	m["/grpc.testing.TestService/EmptyCall"] = mc
5924	m["/grpc.testing.TestService/FullDuplexCall"] = mc
5925	sc := grpc.ServiceConfig{
5926		Methods: m,
5927	}
5928	ch <- sc
5929
5930	cc := te.clientConn()
5931	tc := testpb.NewTestServiceClient(cc)
5932	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5933	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
5934		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5935	}
5936	if _, err := tc.FullDuplexCall(context.Background(), grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
5937		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5938	}
5939
5940	// Generate a service config update.
5941	// Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
5942	mc.WaitForReady = newBool(true)
5943	m = make(map[string]grpc.MethodConfig)
5944	m["/grpc.testing.TestService/EmptyCall"] = mc
5945	m["/grpc.testing.TestService/FullDuplexCall"] = mc
5946	sc = grpc.ServiceConfig{
5947		Methods: m,
5948	}
5949	ch <- sc
5950
5951	// Wait for the new service config to take effect.
5952	mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
5953	for {
5954		if !*mc.WaitForReady {
5955			time.Sleep(100 * time.Millisecond)
5956			mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall")
5957			continue
5958		}
5959		break
5960	}
5961	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
5962	if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
5963		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5964	}
5965	if _, err := tc.FullDuplexCall(context.Background()); status.Code(err) != codes.DeadlineExceeded {
5966		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
5967	}
5968}
5969
5970func (s) TestServiceConfigTimeoutTD(t *testing.T) {
5971	for _, e := range listTestEnv() {
5972		testServiceConfigTimeoutTD(t, e)
5973	}
5974}
5975
5976func testServiceConfigTimeoutTD(t *testing.T, e env) {
5977	te, ch := testServiceConfigSetupTD(t, e)
5978	defer te.tearDown()
5979
5980	// Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
5981	mc := grpc.MethodConfig{
5982		Timeout: newDuration(time.Hour),
5983	}
5984	m := make(map[string]grpc.MethodConfig)
5985	m["/grpc.testing.TestService/EmptyCall"] = mc
5986	m["/grpc.testing.TestService/FullDuplexCall"] = mc
5987	sc := grpc.ServiceConfig{
5988		Methods: m,
5989	}
5990	ch <- sc
5991
5992	cc := te.clientConn()
5993	tc := testpb.NewTestServiceClient(cc)
5994	// The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
5995	ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
5996	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
5997		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5998	}
5999	cancel()
6000	ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond)
6001	if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
6002		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
6003	}
6004	cancel()
6005
6006	// Generate a service config update.
6007	// Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
6008	mc.Timeout = newDuration(time.Nanosecond)
6009	m = make(map[string]grpc.MethodConfig)
6010	m["/grpc.testing.TestService/EmptyCall"] = mc
6011	m["/grpc.testing.TestService/FullDuplexCall"] = mc
6012	sc = grpc.ServiceConfig{
6013		Methods: m,
6014	}
6015	ch <- sc
6016
6017	// Wait for the new service config to take effect.
6018	mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
6019	for {
6020		if *mc.Timeout != time.Nanosecond {
6021			time.Sleep(100 * time.Millisecond)
6022			mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall")
6023			continue
6024		}
6025		break
6026	}
6027
6028	ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
6029	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
6030		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
6031	}
6032	cancel()
6033
6034	ctx, cancel = context.WithTimeout(context.Background(), time.Hour)
6035	if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
6036		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
6037	}
6038	cancel()
6039}
6040
6041func (s) TestServiceConfigMaxMsgSizeTD(t *testing.T) {
6042	for _, e := range listTestEnv() {
6043		testServiceConfigMaxMsgSizeTD(t, e)
6044	}
6045}
6046
6047func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) {
6048	// Setting up values and objects shared across all test cases.
6049	const smallSize = 1
6050	const largeSize = 1024
6051	const extraLargeSize = 2048
6052
6053	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
6054	if err != nil {
6055		t.Fatal(err)
6056	}
6057	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
6058	if err != nil {
6059		t.Fatal(err)
6060	}
6061	extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
6062	if err != nil {
6063		t.Fatal(err)
6064	}
6065
6066	mc := grpc.MethodConfig{
6067		MaxReqSize:  newInt(extraLargeSize),
6068		MaxRespSize: newInt(extraLargeSize),
6069	}
6070
6071	m := make(map[string]grpc.MethodConfig)
6072	m["/grpc.testing.TestService/UnaryCall"] = mc
6073	m["/grpc.testing.TestService/FullDuplexCall"] = mc
6074	sc := grpc.ServiceConfig{
6075		Methods: m,
6076	}
6077	// Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
6078	te1, ch1 := testServiceConfigSetupTD(t, e)
6079	te1.startServer(&testServer{security: e.security})
6080	defer te1.tearDown()
6081
6082	ch1 <- sc
6083	tc := testpb.NewTestServiceClient(te1.clientConn())
6084
6085	req := &testpb.SimpleRequest{
6086		ResponseType: testpb.PayloadType_COMPRESSABLE,
6087		ResponseSize: int32(extraLargeSize),
6088		Payload:      smallPayload,
6089	}
6090	// Test for unary RPC recv.
6091	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
6092		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
6093	}
6094
6095	// Test for unary RPC send.
6096	req.Payload = extraLargePayload
6097	req.ResponseSize = int32(smallSize)
6098	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
6099		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
6100	}
6101
6102	// Test for streaming RPC recv.
6103	respParam := []*testpb.ResponseParameters{
6104		{
6105			Size: int32(extraLargeSize),
6106		},
6107	}
6108	sreq := &testpb.StreamingOutputCallRequest{
6109		ResponseType:       testpb.PayloadType_COMPRESSABLE,
6110		ResponseParameters: respParam,
6111		Payload:            smallPayload,
6112	}
6113	stream, err := tc.FullDuplexCall(te1.ctx)
6114	if err != nil {
6115		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6116	}
6117	if err := stream.Send(sreq); err != nil {
6118		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6119	}
6120	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
6121		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
6122	}
6123
6124	// Test for streaming RPC send.
6125	respParam[0].Size = int32(smallSize)
6126	sreq.Payload = extraLargePayload
6127	stream, err = tc.FullDuplexCall(te1.ctx)
6128	if err != nil {
6129		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6130	}
6131	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
6132		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
6133	}
6134
6135	// Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
6136	te2, ch2 := testServiceConfigSetupTD(t, e)
6137	te2.maxClientReceiveMsgSize = newInt(1024)
6138	te2.maxClientSendMsgSize = newInt(1024)
6139	te2.startServer(&testServer{security: e.security})
6140	defer te2.tearDown()
6141	ch2 <- sc
6142	tc = testpb.NewTestServiceClient(te2.clientConn())
6143
6144	// Test for unary RPC recv.
6145	req.Payload = smallPayload
6146	req.ResponseSize = int32(largeSize)
6147
6148	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
6149		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
6150	}
6151
6152	// Test for unary RPC send.
6153	req.Payload = largePayload
6154	req.ResponseSize = int32(smallSize)
6155	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
6156		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
6157	}
6158
6159	// Test for streaming RPC recv.
6160	stream, err = tc.FullDuplexCall(te2.ctx)
6161	respParam[0].Size = int32(largeSize)
6162	sreq.Payload = smallPayload
6163	if err != nil {
6164		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6165	}
6166	if err := stream.Send(sreq); err != nil {
6167		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6168	}
6169	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
6170		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
6171	}
6172
6173	// Test for streaming RPC send.
6174	respParam[0].Size = int32(smallSize)
6175	sreq.Payload = largePayload
6176	stream, err = tc.FullDuplexCall(te2.ctx)
6177	if err != nil {
6178		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6179	}
6180	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
6181		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
6182	}
6183
6184	// Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
6185	te3, ch3 := testServiceConfigSetupTD(t, e)
6186	te3.maxClientReceiveMsgSize = newInt(4096)
6187	te3.maxClientSendMsgSize = newInt(4096)
6188	te3.startServer(&testServer{security: e.security})
6189	defer te3.tearDown()
6190	ch3 <- sc
6191	tc = testpb.NewTestServiceClient(te3.clientConn())
6192
6193	// Test for unary RPC recv.
6194	req.Payload = smallPayload
6195	req.ResponseSize = int32(largeSize)
6196
6197	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
6198		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
6199	}
6200
6201	req.ResponseSize = int32(extraLargeSize)
6202	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
6203		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
6204	}
6205
6206	// Test for unary RPC send.
6207	req.Payload = largePayload
6208	req.ResponseSize = int32(smallSize)
6209	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
6210		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
6211	}
6212
6213	req.Payload = extraLargePayload
6214	if _, err := tc.UnaryCall(context.Background(), req); err == nil || status.Code(err) != codes.ResourceExhausted {
6215		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
6216	}
6217
6218	// Test for streaming RPC recv.
6219	stream, err = tc.FullDuplexCall(te3.ctx)
6220	if err != nil {
6221		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6222	}
6223	respParam[0].Size = int32(largeSize)
6224	sreq.Payload = smallPayload
6225
6226	if err := stream.Send(sreq); err != nil {
6227		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6228	}
6229	if _, err := stream.Recv(); err != nil {
6230		t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
6231	}
6232
6233	respParam[0].Size = int32(extraLargeSize)
6234
6235	if err := stream.Send(sreq); err != nil {
6236		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6237	}
6238	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
6239		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
6240	}
6241
6242	// Test for streaming RPC send.
6243	respParam[0].Size = int32(smallSize)
6244	sreq.Payload = largePayload
6245	stream, err = tc.FullDuplexCall(te3.ctx)
6246	if err != nil {
6247		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6248	}
6249	if err := stream.Send(sreq); err != nil {
6250		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6251	}
6252	sreq.Payload = extraLargePayload
6253	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
6254		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
6255	}
6256}
6257
6258func (s) TestMethodFromServerStream(t *testing.T) {
6259	const testMethod = "/package.service/method"
6260	e := tcpClearRREnv
6261	te := newTest(t, e)
6262	var method string
6263	var ok bool
6264	te.unknownHandler = func(srv interface{}, stream grpc.ServerStream) error {
6265		method, ok = grpc.MethodFromServerStream(stream)
6266		return nil
6267	}
6268
6269	te.startServer(nil)
6270	defer te.tearDown()
6271	_ = te.clientConn().Invoke(context.Background(), testMethod, nil, nil)
6272	if !ok || method != testMethod {
6273		t.Fatalf("Invoke with method %q, got %q, %v, want %q, true", testMethod, method, ok, testMethod)
6274	}
6275}
6276
6277func (s) TestInterceptorCanAccessCallOptions(t *testing.T) {
6278	e := tcpClearRREnv
6279	te := newTest(t, e)
6280	te.startServer(&testServer{security: e.security})
6281	defer te.tearDown()
6282
6283	type observedOptions struct {
6284		headers     []*metadata.MD
6285		trailers    []*metadata.MD
6286		peer        []*peer.Peer
6287		creds       []credentials.PerRPCCredentials
6288		failFast    []bool
6289		maxRecvSize []int
6290		maxSendSize []int
6291		compressor  []string
6292		subtype     []string
6293	}
6294	var observedOpts observedOptions
6295	populateOpts := func(opts []grpc.CallOption) {
6296		for _, o := range opts {
6297			switch o := o.(type) {
6298			case grpc.HeaderCallOption:
6299				observedOpts.headers = append(observedOpts.headers, o.HeaderAddr)
6300			case grpc.TrailerCallOption:
6301				observedOpts.trailers = append(observedOpts.trailers, o.TrailerAddr)
6302			case grpc.PeerCallOption:
6303				observedOpts.peer = append(observedOpts.peer, o.PeerAddr)
6304			case grpc.PerRPCCredsCallOption:
6305				observedOpts.creds = append(observedOpts.creds, o.Creds)
6306			case grpc.FailFastCallOption:
6307				observedOpts.failFast = append(observedOpts.failFast, o.FailFast)
6308			case grpc.MaxRecvMsgSizeCallOption:
6309				observedOpts.maxRecvSize = append(observedOpts.maxRecvSize, o.MaxRecvMsgSize)
6310			case grpc.MaxSendMsgSizeCallOption:
6311				observedOpts.maxSendSize = append(observedOpts.maxSendSize, o.MaxSendMsgSize)
6312			case grpc.CompressorCallOption:
6313				observedOpts.compressor = append(observedOpts.compressor, o.CompressorType)
6314			case grpc.ContentSubtypeCallOption:
6315				observedOpts.subtype = append(observedOpts.subtype, o.ContentSubtype)
6316			}
6317		}
6318	}
6319
6320	te.unaryClientInt = func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
6321		populateOpts(opts)
6322		return nil
6323	}
6324	te.streamClientInt = func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
6325		populateOpts(opts)
6326		return nil, nil
6327	}
6328
6329	defaults := []grpc.CallOption{
6330		grpc.WaitForReady(true),
6331		grpc.MaxCallRecvMsgSize(1010),
6332	}
6333	tc := testpb.NewTestServiceClient(te.clientConn(grpc.WithDefaultCallOptions(defaults...)))
6334
6335	var headers metadata.MD
6336	var trailers metadata.MD
6337	var pr peer.Peer
6338	tc.UnaryCall(context.Background(), &testpb.SimpleRequest{},
6339		grpc.MaxCallRecvMsgSize(100),
6340		grpc.MaxCallSendMsgSize(200),
6341		grpc.PerRPCCredentials(testPerRPCCredentials{}),
6342		grpc.Header(&headers),
6343		grpc.Trailer(&trailers),
6344		grpc.Peer(&pr))
6345	expected := observedOptions{
6346		failFast:    []bool{false},
6347		maxRecvSize: []int{1010, 100},
6348		maxSendSize: []int{200},
6349		creds:       []credentials.PerRPCCredentials{testPerRPCCredentials{}},
6350		headers:     []*metadata.MD{&headers},
6351		trailers:    []*metadata.MD{&trailers},
6352		peer:        []*peer.Peer{&pr},
6353	}
6354
6355	if !reflect.DeepEqual(expected, observedOpts) {
6356		t.Errorf("unary call did not observe expected options: expected %#v, got %#v", expected, observedOpts)
6357	}
6358
6359	observedOpts = observedOptions{} // reset
6360
6361	tc.StreamingInputCall(context.Background(),
6362		grpc.WaitForReady(false),
6363		grpc.MaxCallSendMsgSize(2020),
6364		grpc.UseCompressor("comp-type"),
6365		grpc.CallContentSubtype("json"))
6366	expected = observedOptions{
6367		failFast:    []bool{false, true},
6368		maxRecvSize: []int{1010},
6369		maxSendSize: []int{2020},
6370		compressor:  []string{"comp-type"},
6371		subtype:     []string{"json"},
6372	}
6373
6374	if !reflect.DeepEqual(expected, observedOpts) {
6375		t.Errorf("streaming call did not observe expected options: expected %#v, got %#v", expected, observedOpts)
6376	}
6377}
6378
6379func (s) TestCompressorRegister(t *testing.T) {
6380	for _, e := range listTestEnv() {
6381		testCompressorRegister(t, e)
6382	}
6383}
6384
6385func testCompressorRegister(t *testing.T, e env) {
6386	te := newTest(t, e)
6387	te.clientCompression = false
6388	te.serverCompression = false
6389	te.clientUseCompression = true
6390
6391	te.startServer(&testServer{security: e.security})
6392	defer te.tearDown()
6393	tc := testpb.NewTestServiceClient(te.clientConn())
6394
6395	// Unary call
6396	const argSize = 271828
6397	const respSize = 314159
6398	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
6399	if err != nil {
6400		t.Fatal(err)
6401	}
6402	req := &testpb.SimpleRequest{
6403		ResponseType: testpb.PayloadType_COMPRESSABLE,
6404		ResponseSize: respSize,
6405		Payload:      payload,
6406	}
6407	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs("something", "something"))
6408	if _, err := tc.UnaryCall(ctx, req); err != nil {
6409		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
6410	}
6411	// Streaming RPC
6412	ctx, cancel := context.WithCancel(context.Background())
6413	defer cancel()
6414	stream, err := tc.FullDuplexCall(ctx)
6415	if err != nil {
6416		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
6417	}
6418	respParam := []*testpb.ResponseParameters{
6419		{
6420			Size: 31415,
6421		},
6422	}
6423	payload, err = newPayload(testpb.PayloadType_COMPRESSABLE, int32(31415))
6424	if err != nil {
6425		t.Fatal(err)
6426	}
6427	sreq := &testpb.StreamingOutputCallRequest{
6428		ResponseType:       testpb.PayloadType_COMPRESSABLE,
6429		ResponseParameters: respParam,
6430		Payload:            payload,
6431	}
6432	if err := stream.Send(sreq); err != nil {
6433		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
6434	}
6435	if _, err := stream.Recv(); err != nil {
6436		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
6437	}
6438}
6439
6440func (s) TestServeExitsWhenListenerClosed(t *testing.T) {
6441
6442	ss := &stubServer{
6443		emptyCall: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
6444			return &testpb.Empty{}, nil
6445		},
6446	}
6447
6448	s := grpc.NewServer()
6449	testpb.RegisterTestServiceServer(s, ss)
6450
6451	lis, err := net.Listen("tcp", "localhost:0")
6452	if err != nil {
6453		t.Fatalf("Failed to create listener: %v", err)
6454	}
6455
6456	done := make(chan struct{})
6457	go func() {
6458		s.Serve(lis)
6459		close(done)
6460	}()
6461
6462	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
6463	if err != nil {
6464		t.Fatalf("Failed to dial server: %v", err)
6465	}
6466	defer cc.Close()
6467	c := testpb.NewTestServiceClient(cc)
6468	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6469	defer cancel()
6470	if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
6471		t.Fatalf("Failed to send test RPC to server: %v", err)
6472	}
6473
6474	if err := lis.Close(); err != nil {
6475		t.Fatalf("Failed to close listener: %v", err)
6476	}
6477	const timeout = 5 * time.Second
6478	timer := time.NewTimer(timeout)
6479	select {
6480	case <-done:
6481		return
6482	case <-timer.C:
6483		t.Fatalf("Serve did not return after %v", timeout)
6484	}
6485}
6486
6487// Service handler returns status with invalid utf8 message.
6488func (s) TestStatusInvalidUTF8Message(t *testing.T) {
6489
6490	var (
6491		origMsg = string([]byte{0xff, 0xfe, 0xfd})
6492		wantMsg = "���"
6493	)
6494
6495	ss := &stubServer{
6496		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
6497			return nil, status.Errorf(codes.Internal, origMsg)
6498		},
6499	}
6500	if err := ss.Start(nil); err != nil {
6501		t.Fatalf("Error starting endpoint server: %v", err)
6502	}
6503	defer ss.Stop()
6504
6505	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
6506	defer cancel()
6507
6508	if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); status.Convert(err).Message() != wantMsg {
6509		t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, status.Convert(err).Message(), wantMsg)
6510	}
6511}
6512
6513// Service handler returns status with details and invalid utf8 message. Proto
6514// will fail to marshal the status because of the invalid utf8 message. Details
6515// will be dropped when sending.
6516func (s) TestStatusInvalidUTF8Details(t *testing.T) {
6517
6518	var (
6519		origMsg = string([]byte{0xff, 0xfe, 0xfd})
6520		wantMsg = "���"
6521	)
6522
6523	ss := &stubServer{
6524		emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
6525			st := status.New(codes.Internal, origMsg)
6526			st, err := st.WithDetails(&testpb.Empty{})
6527			if err != nil {
6528				return nil, err
6529			}
6530			return nil, st.Err()
6531		},
6532	}
6533	if err := ss.Start(nil); err != nil {
6534		t.Fatalf("Error starting endpoint server: %v", err)
6535	}
6536	defer ss.Stop()
6537
6538	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
6539	defer cancel()
6540
6541	_, err := ss.client.EmptyCall(ctx, &testpb.Empty{})
6542	st := status.Convert(err)
6543	if st.Message() != wantMsg {
6544		t.Fatalf("ss.client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, st.Message(), wantMsg)
6545	}
6546	if len(st.Details()) != 0 {
6547		// Details should be dropped on the server side.
6548		t.Fatalf("RPC status contain details: %v, want no details", st.Details())
6549	}
6550}
6551
6552func (s) TestClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T) {
6553	for _, e := range listTestEnv() {
6554		if e.httpHandler {
6555			continue
6556		}
6557		testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t, e)
6558	}
6559}
6560
6561func testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T, e env) {
6562	te := newTest(t, e)
6563	te.userAgent = testAppUA
6564	smallSize := 1024
6565	te.maxServerReceiveMsgSize = &smallSize
6566	te.startServer(&testServer{security: e.security})
6567	defer te.tearDown()
6568	tc := testpb.NewTestServiceClient(te.clientConn())
6569	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1048576)
6570	if err != nil {
6571		t.Fatal(err)
6572	}
6573	req := &testpb.SimpleRequest{
6574		ResponseType: testpb.PayloadType_COMPRESSABLE,
6575		Payload:      payload,
6576	}
6577	var wg sync.WaitGroup
6578	for i := 0; i < 10; i++ {
6579		wg.Add(1)
6580		go func() {
6581			defer wg.Done()
6582			for j := 0; j < 100; j++ {
6583				ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
6584				defer cancel()
6585				if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.ResourceExhausted {
6586					t.Errorf("TestService/UnaryCall(_,_) = _. %v, want code: %s", err, codes.ResourceExhausted)
6587					return
6588				}
6589			}
6590		}()
6591	}
6592	wg.Wait()
6593}
6594
6595const clientAlwaysFailCredErrorMsg = "clientAlwaysFailCred always fails"
6596
6597var errClientAlwaysFailCred = errors.New(clientAlwaysFailCredErrorMsg)
6598
6599type clientAlwaysFailCred struct{}
6600
6601func (c clientAlwaysFailCred) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
6602	return nil, nil, errClientAlwaysFailCred
6603}
6604func (c clientAlwaysFailCred) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
6605	return rawConn, nil, nil
6606}
6607func (c clientAlwaysFailCred) Info() credentials.ProtocolInfo {
6608	return credentials.ProtocolInfo{}
6609}
6610func (c clientAlwaysFailCred) Clone() credentials.TransportCredentials {
6611	return nil
6612}
6613func (c clientAlwaysFailCred) OverrideServerName(s string) error {
6614	return nil
6615}
6616
6617func (s) TestFailFastRPCErrorOnBadCertificates(t *testing.T) {
6618	te := newTest(t, env{name: "bad-cred", network: "tcp", security: "clientAlwaysFailCred", balancer: "round_robin"})
6619	te.startServer(&testServer{security: te.e.security})
6620	defer te.tearDown()
6621
6622	opts := []grpc.DialOption{grpc.WithTransportCredentials(clientAlwaysFailCred{})}
6623	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
6624	defer cancel()
6625	cc, err := grpc.DialContext(ctx, te.srvAddr, opts...)
6626	if err != nil {
6627		t.Fatalf("Dial(_) = %v, want %v", err, nil)
6628	}
6629	defer cc.Close()
6630
6631	tc := testpb.NewTestServiceClient(cc)
6632	for i := 0; i < 1000; i++ {
6633		// This loop runs for at most 1 second. The first several RPCs will fail
6634		// with Unavailable because the connection hasn't started. When the
6635		// first connection failed with creds error, the next RPC should also
6636		// fail with the expected error.
6637		if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); strings.Contains(err.Error(), clientAlwaysFailCredErrorMsg) {
6638			return
6639		}
6640		time.Sleep(time.Millisecond)
6641	}
6642	te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg)
6643}
6644
6645func (s) TestRPCTimeout(t *testing.T) {
6646	for _, e := range listTestEnv() {
6647		testRPCTimeout(t, e)
6648	}
6649}
6650
6651func testRPCTimeout(t *testing.T, e env) {
6652	te := newTest(t, e)
6653	te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond})
6654	defer te.tearDown()
6655
6656	cc := te.clientConn()
6657	tc := testpb.NewTestServiceClient(cc)
6658
6659	const argSize = 2718
6660	const respSize = 314
6661
6662	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
6663	if err != nil {
6664		t.Fatal(err)
6665	}
6666
6667	req := &testpb.SimpleRequest{
6668		ResponseType: testpb.PayloadType_COMPRESSABLE,
6669		ResponseSize: respSize,
6670		Payload:      payload,
6671	}
6672	for i := -1; i <= 10; i++ {
6673		ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
6674		if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded {
6675			t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
6676		}
6677		cancel()
6678	}
6679}
6680
6681func (s) TestDisabledIOBuffers(t *testing.T) {
6682
6683	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(60000))
6684	if err != nil {
6685		t.Fatalf("Failed to create payload: %v", err)
6686	}
6687	req := &testpb.StreamingOutputCallRequest{
6688		Payload: payload,
6689	}
6690	resp := &testpb.StreamingOutputCallResponse{
6691		Payload: payload,
6692	}
6693
6694	ss := &stubServer{
6695		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
6696			for {
6697				in, err := stream.Recv()
6698				if err == io.EOF {
6699					return nil
6700				}
6701				if err != nil {
6702					t.Errorf("stream.Recv() = _, %v, want _, <nil>", err)
6703					return err
6704				}
6705				if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
6706					t.Errorf("Received message(len: %v) on server not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
6707					return err
6708				}
6709				if err := stream.Send(resp); err != nil {
6710					t.Errorf("stream.Send(_)= %v, want <nil>", err)
6711					return err
6712				}
6713
6714			}
6715		},
6716	}
6717
6718	s := grpc.NewServer(grpc.WriteBufferSize(0), grpc.ReadBufferSize(0))
6719	testpb.RegisterTestServiceServer(s, ss)
6720
6721	lis, err := net.Listen("tcp", "localhost:0")
6722	if err != nil {
6723		t.Fatalf("Failed to create listener: %v", err)
6724	}
6725
6726	done := make(chan struct{})
6727	go func() {
6728		s.Serve(lis)
6729		close(done)
6730	}()
6731	defer s.Stop()
6732	dctx, dcancel := context.WithTimeout(context.Background(), 5*time.Second)
6733	defer dcancel()
6734	cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithWriteBufferSize(0), grpc.WithReadBufferSize(0))
6735	if err != nil {
6736		t.Fatalf("Failed to dial server")
6737	}
6738	defer cc.Close()
6739	c := testpb.NewTestServiceClient(cc)
6740	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6741	defer cancel()
6742	stream, err := c.FullDuplexCall(ctx, grpc.WaitForReady(true))
6743	if err != nil {
6744		t.Fatalf("Failed to send test RPC to server")
6745	}
6746	for i := 0; i < 10; i++ {
6747		if err := stream.Send(req); err != nil {
6748			t.Fatalf("stream.Send(_) = %v, want <nil>", err)
6749		}
6750		in, err := stream.Recv()
6751		if err != nil {
6752			t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
6753		}
6754		if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
6755			t.Fatalf("Received message(len: %v) on client not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
6756		}
6757	}
6758	stream.CloseSend()
6759	if _, err := stream.Recv(); err != io.EOF {
6760		t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
6761	}
6762}
6763
6764func (s) TestServerMaxHeaderListSizeClientUserViolation(t *testing.T) {
6765	for _, e := range listTestEnv() {
6766		if e.httpHandler {
6767			continue
6768		}
6769		testServerMaxHeaderListSizeClientUserViolation(t, e)
6770	}
6771}
6772
6773func testServerMaxHeaderListSizeClientUserViolation(t *testing.T, e env) {
6774	te := newTest(t, e)
6775	te.maxServerHeaderListSize = new(uint32)
6776	*te.maxServerHeaderListSize = 216
6777	te.startServer(&testServer{security: e.security})
6778	defer te.tearDown()
6779
6780	cc := te.clientConn()
6781	tc := testpb.NewTestServiceClient(cc)
6782	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6783	defer cancel()
6784	metadata.AppendToOutgoingContext(ctx, "oversize", string(make([]byte, 216)))
6785	var err error
6786	if err = verifyResultWithDelay(func() (bool, error) {
6787		if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal {
6788			return true, nil
6789		}
6790		return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal)
6791	}); err != nil {
6792		t.Fatal(err)
6793	}
6794}
6795
6796func (s) TestClientMaxHeaderListSizeServerUserViolation(t *testing.T) {
6797	for _, e := range listTestEnv() {
6798		if e.httpHandler {
6799			continue
6800		}
6801		testClientMaxHeaderListSizeServerUserViolation(t, e)
6802	}
6803}
6804
6805func testClientMaxHeaderListSizeServerUserViolation(t *testing.T, e env) {
6806	te := newTest(t, e)
6807	te.maxClientHeaderListSize = new(uint32)
6808	*te.maxClientHeaderListSize = 1 // any header server sends will violate
6809	te.startServer(&testServer{security: e.security})
6810	defer te.tearDown()
6811
6812	cc := te.clientConn()
6813	tc := testpb.NewTestServiceClient(cc)
6814	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6815	defer cancel()
6816	var err error
6817	if err = verifyResultWithDelay(func() (bool, error) {
6818		if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal {
6819			return true, nil
6820		}
6821		return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal)
6822	}); err != nil {
6823		t.Fatal(err)
6824	}
6825}
6826
6827func (s) TestServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T) {
6828	for _, e := range listTestEnv() {
6829		if e.httpHandler || e.security == "tls" {
6830			continue
6831		}
6832		testServerMaxHeaderListSizeClientIntentionalViolation(t, e)
6833	}
6834}
6835
6836func testServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T, e env) {
6837	te := newTest(t, e)
6838	te.maxServerHeaderListSize = new(uint32)
6839	*te.maxServerHeaderListSize = 512
6840	te.startServer(&testServer{security: e.security})
6841	defer te.tearDown()
6842
6843	cc, dw := te.clientConnWithConnControl()
6844	tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
6845	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6846	defer cancel()
6847	stream, err := tc.FullDuplexCall(ctx)
6848	if err != nil {
6849		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
6850	}
6851	rcw := dw.getRawConnWrapper()
6852	val := make([]string, 512)
6853	for i := range val {
6854		val[i] = "a"
6855	}
6856	// allow for client to send the initial header
6857	time.Sleep(100 * time.Millisecond)
6858	rcw.writeHeaders(http2.HeadersFrameParam{
6859		StreamID:      tc.getCurrentStreamID(),
6860		BlockFragment: rcw.encodeHeader("oversize", strings.Join(val, "")),
6861		EndStream:     false,
6862		EndHeaders:    true,
6863	})
6864	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal {
6865		t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal)
6866	}
6867}
6868
6869func (s) TestClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T) {
6870	for _, e := range listTestEnv() {
6871		if e.httpHandler || e.security == "tls" {
6872			continue
6873		}
6874		testClientMaxHeaderListSizeServerIntentionalViolation(t, e)
6875	}
6876}
6877
6878func testClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T, e env) {
6879	te := newTest(t, e)
6880	te.maxClientHeaderListSize = new(uint32)
6881	*te.maxClientHeaderListSize = 200
6882	lw := te.startServerWithConnControl(&testServer{security: e.security, setHeaderOnly: true})
6883	defer te.tearDown()
6884	cc, _ := te.clientConnWithConnControl()
6885	tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
6886	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6887	defer cancel()
6888	stream, err := tc.FullDuplexCall(ctx)
6889	if err != nil {
6890		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
6891	}
6892	var i int
6893	var rcw *rawConnWrapper
6894	for i = 0; i < 100; i++ {
6895		rcw = lw.getLastConn()
6896		if rcw != nil {
6897			break
6898		}
6899		time.Sleep(10 * time.Millisecond)
6900		continue
6901	}
6902	if i == 100 {
6903		t.Fatalf("failed to create server transport after 1s")
6904	}
6905
6906	val := make([]string, 200)
6907	for i := range val {
6908		val[i] = "a"
6909	}
6910	// allow for client to send the initial header.
6911	time.Sleep(100 * time.Millisecond)
6912	rcw.writeHeaders(http2.HeadersFrameParam{
6913		StreamID:      tc.getCurrentStreamID(),
6914		BlockFragment: rcw.encodeRawHeader("oversize", strings.Join(val, "")),
6915		EndStream:     false,
6916		EndHeaders:    true,
6917	})
6918	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal {
6919		t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal)
6920	}
6921}
6922
6923func (s) TestNetPipeConn(t *testing.T) {
6924	// This test will block indefinitely if grpc writes both client and server
6925	// prefaces without either reading from the Conn.
6926	pl := testutils.NewPipeListener()
6927	s := grpc.NewServer()
6928	defer s.Stop()
6929	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
6930		return &testpb.SimpleResponse{}, nil
6931	}}
6932	testpb.RegisterTestServiceServer(s, ts)
6933	go s.Serve(pl)
6934	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
6935	defer cancel()
6936	cc, err := grpc.DialContext(ctx, "", grpc.WithInsecure(), grpc.WithDialer(pl.Dialer()))
6937	if err != nil {
6938		t.Fatalf("Error creating client: %v", err)
6939	}
6940	defer cc.Close()
6941	client := testpb.NewTestServiceClient(cc)
6942	if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
6943		t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
6944	}
6945}
6946
6947func (s) TestLargeTimeout(t *testing.T) {
6948	for _, e := range listTestEnv() {
6949		testLargeTimeout(t, e)
6950	}
6951}
6952
6953func testLargeTimeout(t *testing.T, e env) {
6954	te := newTest(t, e)
6955	te.declareLogNoise("Server.processUnaryRPC failed to write status")
6956
6957	ts := &funcServer{}
6958	te.startServer(ts)
6959	defer te.tearDown()
6960	tc := testpb.NewTestServiceClient(te.clientConn())
6961
6962	timeouts := []time.Duration{
6963		time.Duration(math.MaxInt64), // will be (correctly) converted to
6964		// 2562048 hours, which overflows upon converting back to an int64
6965		2562047 * time.Hour, // the largest timeout that does not overflow
6966	}
6967
6968	for i, maxTimeout := range timeouts {
6969		ts.unaryCall = func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
6970			deadline, ok := ctx.Deadline()
6971			timeout := time.Until(deadline)
6972			minTimeout := maxTimeout - 5*time.Second
6973			if !ok || timeout < minTimeout || timeout > maxTimeout {
6974				t.Errorf("ctx.Deadline() = (now+%v), %v; want [%v, %v], true", timeout, ok, minTimeout, maxTimeout)
6975				return nil, status.Error(codes.OutOfRange, "deadline error")
6976			}
6977			return &testpb.SimpleResponse{}, nil
6978		}
6979
6980		ctx, cancel := context.WithTimeout(context.Background(), maxTimeout)
6981		defer cancel()
6982
6983		if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
6984			t.Errorf("case %v: UnaryCall(_) = _, %v; want _, nil", i, err)
6985		}
6986	}
6987}
6988
6989// Proxies typically send GO_AWAY followed by connection closure a minute or so later. This
6990// test ensures that the connection is re-created after GO_AWAY and not affected by the
6991// subsequent (old) connection closure.
6992func (s) TestGoAwayThenClose(t *testing.T) {
6993
6994	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
6995	defer cancel()
6996
6997	lis1, err := net.Listen("tcp", "localhost:0")
6998	if err != nil {
6999		t.Fatalf("Error while listening. Err: %v", err)
7000	}
7001	s1 := grpc.NewServer()
7002	defer s1.Stop()
7003	ts1 := &funcServer{
7004		unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
7005			return &testpb.SimpleResponse{}, nil
7006		},
7007		fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
7008			// Wait forever.
7009			_, err := stream.Recv()
7010			if err == nil {
7011				t.Error("expected to never receive any message")
7012			}
7013			return err
7014		},
7015	}
7016	testpb.RegisterTestServiceServer(s1, ts1)
7017	go s1.Serve(lis1)
7018
7019	conn2Established := grpcsync.NewEvent()
7020	lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established)
7021	if err != nil {
7022		t.Fatalf("Error while listening. Err: %v", err)
7023	}
7024	s2 := grpc.NewServer()
7025	defer s2.Stop()
7026	conn2Ready := grpcsync.NewEvent()
7027	ts2 := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
7028		conn2Ready.Fire()
7029		return &testpb.SimpleResponse{}, nil
7030	}}
7031	testpb.RegisterTestServiceServer(s2, ts2)
7032	go s2.Serve(lis2)
7033
7034	r, rcleanup := manual.GenerateAndRegisterManualResolver()
7035	defer rcleanup()
7036	r.InitialState(resolver.State{Addresses: []resolver.Address{
7037		{Addr: lis1.Addr().String()},
7038		{Addr: lis2.Addr().String()},
7039	}})
7040	cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure())
7041	if err != nil {
7042		t.Fatalf("Error creating client: %v", err)
7043	}
7044	defer cc.Close()
7045
7046	client := testpb.NewTestServiceClient(cc)
7047
7048	// Should go on connection 1. We use a long-lived RPC because it will cause GracefulStop to send GO_AWAY, but the
7049	// connection doesn't get closed until the server stops and the client receives.
7050	stream, err := client.FullDuplexCall(ctx)
7051	if err != nil {
7052		t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err)
7053	}
7054
7055	// Send GO_AWAY to connection 1.
7056	go s1.GracefulStop()
7057
7058	// Wait for connection 2 to be established.
7059	<-conn2Established.Done()
7060
7061	// Close connection 1.
7062	s1.Stop()
7063
7064	// Wait for client to close.
7065	_, err = stream.Recv()
7066	if err == nil {
7067		t.Fatal("expected the stream to die, but got a successful Recv")
7068	}
7069
7070	// Do a bunch of RPCs, make sure it stays stable. These should go to connection 2.
7071	for i := 0; i < 10; i++ {
7072		if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
7073			t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
7074		}
7075	}
7076}
7077
7078func listenWithNotifyingListener(network, address string, event *grpcsync.Event) (net.Listener, error) {
7079	lis, err := net.Listen(network, address)
7080	if err != nil {
7081		return nil, err
7082	}
7083	return notifyingListener{connEstablished: event, Listener: lis}, nil
7084}
7085
7086type notifyingListener struct {
7087	connEstablished *grpcsync.Event
7088	net.Listener
7089}
7090
7091func (lis notifyingListener) Accept() (net.Conn, error) {
7092	defer lis.connEstablished.Fire()
7093	return lis.Listener.Accept()
7094}
7095
7096func (s) TestRPCWaitsForResolver(t *testing.T) {
7097	te := testServiceConfigSetup(t, tcpClearRREnv)
7098	te.startServer(&testServer{security: tcpClearRREnv.security})
7099	defer te.tearDown()
7100	r, rcleanup := manual.GenerateAndRegisterManualResolver()
7101	defer rcleanup()
7102
7103	te.resolverScheme = r.Scheme()
7104	te.nonBlockingDial = true
7105	cc := te.clientConn()
7106	tc := testpb.NewTestServiceClient(cc)
7107
7108	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
7109	defer cancel()
7110	// With no resolved addresses yet, this will timeout.
7111	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
7112		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
7113	}
7114
7115	ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
7116	defer cancel()
7117	go func() {
7118		time.Sleep(time.Second)
7119		r.UpdateState(resolver.State{
7120			Addresses: []resolver.Address{{Addr: te.srvAddr}},
7121			ServiceConfig: `{
7122		    "methodConfig": [
7123		        {
7124		            "name": [
7125		                {
7126		                    "service": "grpc.testing.TestService",
7127		                    "method": "UnaryCall"
7128		                }
7129		            ],
7130                    "maxRequestMessageBytes": 0
7131		        }
7132		    ]
7133		}`})
7134	}()
7135	// We wait a second before providing a service config and resolving
7136	// addresses.  So this will wait for that and then honor the
7137	// maxRequestMessageBytes it contains.
7138	if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{ResponseType: testpb.PayloadType_UNCOMPRESSABLE}); status.Code(err) != codes.ResourceExhausted {
7139		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err)
7140	}
7141	if got := ctx.Err(); got != nil {
7142		t.Fatalf("ctx.Err() = %v; want nil (deadline should be set short by service config)", got)
7143	}
7144	if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
7145		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err)
7146	}
7147}
7148
7149func (s) TestHTTPHeaderFrameErrorHandlingHTTPMode(t *testing.T) {
7150	// Non-gRPC content-type fallback path.
7151	for httpCode := range transport.HTTPStatusConvTab {
7152		doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{
7153			":status", fmt.Sprintf("%d", httpCode),
7154			"content-type", "text/html", // non-gRPC content type to switch to HTTP mode.
7155			"grpc-status", "1", // Make up a gRPC status error
7156			"grpc-status-details-bin", "???", // Make up a gRPC field parsing error
7157		})
7158	}
7159
7160	// Missing content-type fallback path.
7161	for httpCode := range transport.HTTPStatusConvTab {
7162		doHTTPHeaderTest(t, transport.HTTPStatusConvTab[int(httpCode)], []string{
7163			":status", fmt.Sprintf("%d", httpCode),
7164			// Omitting content type to switch to HTTP mode.
7165			"grpc-status", "1", // Make up a gRPC status error
7166			"grpc-status-details-bin", "???", // Make up a gRPC field parsing error
7167		})
7168	}
7169
7170	// Malformed HTTP status when fallback.
7171	doHTTPHeaderTest(t, codes.Internal, []string{
7172		":status", "abc",
7173		// Omitting content type to switch to HTTP mode.
7174		"grpc-status", "1", // Make up a gRPC status error
7175		"grpc-status-details-bin", "???", // Make up a gRPC field parsing error
7176	})
7177}
7178
7179// Testing erroneous ResponseHeader or Trailers-only (delivered in the first HEADERS frame).
7180func (s) TestHTTPHeaderFrameErrorHandlingInitialHeader(t *testing.T) {
7181	for _, test := range []struct {
7182		header  []string
7183		errCode codes.Code
7184	}{
7185		{
7186			// missing gRPC status.
7187			header: []string{
7188				":status", "403",
7189				"content-type", "application/grpc",
7190			},
7191			errCode: codes.Unknown,
7192		},
7193		{
7194			// malformed grpc-status.
7195			header: []string{
7196				":status", "502",
7197				"content-type", "application/grpc",
7198				"grpc-status", "abc",
7199			},
7200			errCode: codes.Internal,
7201		},
7202		{
7203			// Malformed grpc-tags-bin field.
7204			header: []string{
7205				":status", "502",
7206				"content-type", "application/grpc",
7207				"grpc-status", "0",
7208				"grpc-tags-bin", "???",
7209			},
7210			errCode: codes.Internal,
7211		},
7212		{
7213			// gRPC status error.
7214			header: []string{
7215				":status", "502",
7216				"content-type", "application/grpc",
7217				"grpc-status", "3",
7218			},
7219			errCode: codes.InvalidArgument,
7220		},
7221	} {
7222		doHTTPHeaderTest(t, test.errCode, test.header)
7223	}
7224}
7225
7226// Testing non-Trailers-only Trailers (delievered in second HEADERS frame)
7227func (s) TestHTTPHeaderFrameErrorHandlingNormalTrailer(t *testing.T) {
7228	for _, test := range []struct {
7229		responseHeader []string
7230		trailer        []string
7231		errCode        codes.Code
7232	}{
7233		{
7234			responseHeader: []string{
7235				":status", "200",
7236				"content-type", "application/grpc",
7237			},
7238			trailer: []string{
7239				// trailer missing grpc-status
7240				":status", "502",
7241			},
7242			errCode: codes.Unknown,
7243		},
7244		{
7245			responseHeader: []string{
7246				":status", "404",
7247				"content-type", "application/grpc",
7248			},
7249			trailer: []string{
7250				// malformed grpc-status-details-bin field
7251				"grpc-status", "0",
7252				"grpc-status-details-bin", "????",
7253			},
7254			errCode: codes.Internal,
7255		},
7256	} {
7257		doHTTPHeaderTest(t, test.errCode, test.responseHeader, test.trailer)
7258	}
7259}
7260
7261func (s) TestHTTPHeaderFrameErrorHandlingMoreThanTwoHeaders(t *testing.T) {
7262	header := []string{
7263		":status", "200",
7264		"content-type", "application/grpc",
7265	}
7266	doHTTPHeaderTest(t, codes.Internal, header, header, header)
7267}
7268
7269type httpServer struct {
7270	headerFields [][]string
7271}
7272
7273func (s *httpServer) writeHeader(framer *http2.Framer, sid uint32, headerFields []string, endStream bool) error {
7274	if len(headerFields)%2 == 1 {
7275		panic("odd number of kv args")
7276	}
7277
7278	var buf bytes.Buffer
7279	henc := hpack.NewEncoder(&buf)
7280	for len(headerFields) > 0 {
7281		k, v := headerFields[0], headerFields[1]
7282		headerFields = headerFields[2:]
7283		henc.WriteField(hpack.HeaderField{Name: k, Value: v})
7284	}
7285
7286	return framer.WriteHeaders(http2.HeadersFrameParam{
7287		StreamID:      sid,
7288		BlockFragment: buf.Bytes(),
7289		EndStream:     endStream,
7290		EndHeaders:    true,
7291	})
7292}
7293
7294func (s *httpServer) start(t *testing.T, lis net.Listener) {
7295	// Launch an HTTP server to send back header.
7296	go func() {
7297		conn, err := lis.Accept()
7298		if err != nil {
7299			t.Errorf("Error accepting connection: %v", err)
7300			return
7301		}
7302		defer conn.Close()
7303		// Read preface sent by client.
7304		if _, err = io.ReadFull(conn, make([]byte, len(http2.ClientPreface))); err != nil {
7305			t.Errorf("Error at server-side while reading preface from client. Err: %v", err)
7306			return
7307		}
7308		reader := bufio.NewReader(conn)
7309		writer := bufio.NewWriter(conn)
7310		framer := http2.NewFramer(writer, reader)
7311		if err = framer.WriteSettingsAck(); err != nil {
7312			t.Errorf("Error at server-side while sending Settings ack. Err: %v", err)
7313			return
7314		}
7315		writer.Flush() // necessary since client is expecting preface before declaring connection fully setup.
7316
7317		var sid uint32
7318		// Read frames until a header is received.
7319		for {
7320			frame, err := framer.ReadFrame()
7321			if err != nil {
7322				t.Errorf("Error at server-side while reading frame. Err: %v", err)
7323				return
7324			}
7325			if hframe, ok := frame.(*http2.HeadersFrame); ok {
7326				sid = hframe.Header().StreamID
7327				break
7328			}
7329		}
7330		for i, headers := range s.headerFields {
7331			if err = s.writeHeader(framer, sid, headers, i == len(s.headerFields)-1); err != nil {
7332				t.Errorf("Error at server-side while writing headers. Err: %v", err)
7333				return
7334			}
7335			writer.Flush()
7336		}
7337	}()
7338}
7339
7340func doHTTPHeaderTest(t *testing.T, errCode codes.Code, headerFields ...[]string) {
7341	lis, err := net.Listen("tcp", "localhost:0")
7342	if err != nil {
7343		t.Fatalf("Failed to listen. Err: %v", err)
7344	}
7345	defer lis.Close()
7346	server := &httpServer{
7347		headerFields: headerFields,
7348	}
7349	server.start(t, lis)
7350	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
7351	defer cancel()
7352	cc, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure())
7353	if err != nil {
7354		t.Fatalf("failed to dial due to err: %v", err)
7355	}
7356	defer cc.Close()
7357	client := testpb.NewTestServiceClient(cc)
7358	stream, err := client.FullDuplexCall(ctx)
7359	if err != nil {
7360		t.Fatalf("error creating stream due to err: %v", err)
7361	}
7362	if _, err := stream.Recv(); err == nil || status.Code(err) != errCode {
7363		t.Fatalf("stream.Recv() = _, %v, want error code: %v", err, errCode)
7364	}
7365}
7366