1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"context"
23	"fmt"
24	"io"
25	"math"
26	"net"
27	"strconv"
28	"strings"
29	"sync"
30	"sync/atomic"
31	"time"
32
33	"golang.org/x/net/http2"
34	"golang.org/x/net/http2/hpack"
35
36	"google.golang.org/grpc/codes"
37	"google.golang.org/grpc/credentials"
38	"google.golang.org/grpc/internal/channelz"
39	"google.golang.org/grpc/internal/syscall"
40	"google.golang.org/grpc/keepalive"
41	"google.golang.org/grpc/metadata"
42	"google.golang.org/grpc/peer"
43	"google.golang.org/grpc/stats"
44	"google.golang.org/grpc/status"
45)
46
47// http2Client implements the ClientTransport interface with HTTP2.
48type http2Client struct {
49	ctx        context.Context
50	cancel     context.CancelFunc
51	ctxDone    <-chan struct{} // Cache the ctx.Done() chan.
52	userAgent  string
53	md         interface{}
54	conn       net.Conn // underlying communication channel
55	loopy      *loopyWriter
56	remoteAddr net.Addr
57	localAddr  net.Addr
58	authInfo   credentials.AuthInfo // auth info about the connection
59
60	readerDone chan struct{} // sync point to enable testing.
61	writerDone chan struct{} // sync point to enable testing.
62	// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
63	// that the server sent GoAway on this transport.
64	goAway chan struct{}
65	// awakenKeepalive is used to wake up keepalive when after it has gone dormant.
66	awakenKeepalive chan struct{}
67
68	framer *framer
69	// controlBuf delivers all the control related tasks (e.g., window
70	// updates, reset streams, and various settings) to the controller.
71	controlBuf *controlBuffer
72	fc         *trInFlow
73	// The scheme used: https if TLS is on, http otherwise.
74	scheme string
75
76	isSecure bool
77
78	perRPCCreds []credentials.PerRPCCredentials
79
80	// Boolean to keep track of reading activity on transport.
81	// 1 is true and 0 is false.
82	activity         uint32 // Accessed atomically.
83	kp               keepalive.ClientParameters
84	keepaliveEnabled bool
85
86	statsHandler stats.Handler
87
88	initialWindowSize int32
89
90	// configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
91	maxSendHeaderListSize *uint32
92
93	bdpEst *bdpEstimator
94	// onPrefaceReceipt is a callback that client transport calls upon
95	// receiving server preface to signal that a succefull HTTP2
96	// connection was established.
97	onPrefaceReceipt func()
98
99	maxConcurrentStreams  uint32
100	streamQuota           int64
101	streamsQuotaAvailable chan struct{}
102	waitingStreams        uint32
103	nextID                uint32
104
105	mu            sync.Mutex // guard the following variables
106	state         transportState
107	activeStreams map[uint32]*Stream
108	// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
109	prevGoAwayID uint32
110	// goAwayReason records the http2.ErrCode and debug data received with the
111	// GoAway frame.
112	goAwayReason GoAwayReason
113
114	// Fields below are for channelz metric collection.
115	channelzID int64 // channelz unique identification number
116	czData     *channelzData
117
118	onGoAway func(GoAwayReason)
119	onClose  func()
120
121	bufferPool *bufferPool
122}
123
124func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
125	if fn != nil {
126		return fn(ctx, addr)
127	}
128	return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
129}
130
131func isTemporary(err error) bool {
132	switch err := err.(type) {
133	case interface {
134		Temporary() bool
135	}:
136		return err.Temporary()
137	case interface {
138		Timeout() bool
139	}:
140		// Timeouts may be resolved upon retry, and are thus treated as
141		// temporary.
142		return err.Timeout()
143	}
144	return true
145}
146
147// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
148// and starts to receive messages on it. Non-nil error returns if construction
149// fails.
150func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
151	scheme := "http"
152	ctx, cancel := context.WithCancel(ctx)
153	defer func() {
154		if err != nil {
155			cancel()
156		}
157	}()
158
159	conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
160	if err != nil {
161		if opts.FailOnNonTempDialError {
162			return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
163		}
164		return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
165	}
166	// Any further errors will close the underlying connection
167	defer func(conn net.Conn) {
168		if err != nil {
169			conn.Close()
170		}
171	}(conn)
172	kp := opts.KeepaliveParams
173	// Validate keepalive parameters.
174	if kp.Time == 0 {
175		kp.Time = defaultClientKeepaliveTime
176	}
177	if kp.Timeout == 0 {
178		kp.Timeout = defaultClientKeepaliveTimeout
179	}
180	keepaliveEnabled := false
181	if kp.Time != infinity {
182		if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
183			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
184		}
185		keepaliveEnabled = true
186	}
187	var (
188		isSecure bool
189		authInfo credentials.AuthInfo
190	)
191	transportCreds := opts.TransportCredentials
192	perRPCCreds := opts.PerRPCCredentials
193
194	if b := opts.CredsBundle; b != nil {
195		if t := b.TransportCredentials(); t != nil {
196			transportCreds = t
197		}
198		if t := b.PerRPCCredentials(); t != nil {
199			perRPCCreds = append(perRPCCreds, t)
200		}
201	}
202	if transportCreds != nil {
203		scheme = "https"
204		conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn)
205		if err != nil {
206			return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
207		}
208		isSecure = true
209	}
210	dynamicWindow := true
211	icwz := int32(initialWindowSize)
212	if opts.InitialConnWindowSize >= defaultWindowSize {
213		icwz = opts.InitialConnWindowSize
214		dynamicWindow = false
215	}
216	writeBufSize := opts.WriteBufferSize
217	readBufSize := opts.ReadBufferSize
218	maxHeaderListSize := defaultClientMaxHeaderListSize
219	if opts.MaxHeaderListSize != nil {
220		maxHeaderListSize = *opts.MaxHeaderListSize
221	}
222	t := &http2Client{
223		ctx:                   ctx,
224		ctxDone:               ctx.Done(), // Cache Done chan.
225		cancel:                cancel,
226		userAgent:             opts.UserAgent,
227		md:                    addr.Metadata,
228		conn:                  conn,
229		remoteAddr:            conn.RemoteAddr(),
230		localAddr:             conn.LocalAddr(),
231		authInfo:              authInfo,
232		readerDone:            make(chan struct{}),
233		writerDone:            make(chan struct{}),
234		goAway:                make(chan struct{}),
235		awakenKeepalive:       make(chan struct{}, 1),
236		framer:                newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
237		fc:                    &trInFlow{limit: uint32(icwz)},
238		scheme:                scheme,
239		activeStreams:         make(map[uint32]*Stream),
240		isSecure:              isSecure,
241		perRPCCreds:           perRPCCreds,
242		kp:                    kp,
243		statsHandler:          opts.StatsHandler,
244		initialWindowSize:     initialWindowSize,
245		onPrefaceReceipt:      onPrefaceReceipt,
246		nextID:                1,
247		maxConcurrentStreams:  defaultMaxStreamsClient,
248		streamQuota:           defaultMaxStreamsClient,
249		streamsQuotaAvailable: make(chan struct{}, 1),
250		czData:                new(channelzData),
251		onGoAway:              onGoAway,
252		onClose:               onClose,
253		keepaliveEnabled:      keepaliveEnabled,
254		bufferPool:            newBufferPool(),
255	}
256	t.controlBuf = newControlBuffer(t.ctxDone)
257	if opts.InitialWindowSize >= defaultWindowSize {
258		t.initialWindowSize = opts.InitialWindowSize
259		dynamicWindow = false
260	}
261	if dynamicWindow {
262		t.bdpEst = &bdpEstimator{
263			bdp:               initialWindowSize,
264			updateFlowControl: t.updateFlowControl,
265		}
266	}
267	// Make sure awakenKeepalive can't be written upon.
268	// keepalive routine will make it writable, if need be.
269	t.awakenKeepalive <- struct{}{}
270	if t.statsHandler != nil {
271		t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
272			RemoteAddr: t.remoteAddr,
273			LocalAddr:  t.localAddr,
274		})
275		connBegin := &stats.ConnBegin{
276			Client: true,
277		}
278		t.statsHandler.HandleConn(t.ctx, connBegin)
279	}
280	if channelz.IsOn() {
281		t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
282	}
283	if t.keepaliveEnabled {
284		go t.keepalive()
285	}
286	// Start the reader goroutine for incoming message. Each transport has
287	// a dedicated goroutine which reads HTTP2 frame from network. Then it
288	// dispatches the frame to the corresponding stream entity.
289	go t.reader()
290
291	// Send connection preface to server.
292	n, err := t.conn.Write(clientPreface)
293	if err != nil {
294		t.Close()
295		return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
296	}
297	if n != len(clientPreface) {
298		t.Close()
299		return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
300	}
301	var ss []http2.Setting
302
303	if t.initialWindowSize != defaultWindowSize {
304		ss = append(ss, http2.Setting{
305			ID:  http2.SettingInitialWindowSize,
306			Val: uint32(t.initialWindowSize),
307		})
308	}
309	if opts.MaxHeaderListSize != nil {
310		ss = append(ss, http2.Setting{
311			ID:  http2.SettingMaxHeaderListSize,
312			Val: *opts.MaxHeaderListSize,
313		})
314	}
315	err = t.framer.fr.WriteSettings(ss...)
316	if err != nil {
317		t.Close()
318		return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
319	}
320	// Adjust the connection flow control window if needed.
321	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
322		if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
323			t.Close()
324			return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
325		}
326	}
327
328	if err := t.framer.writer.Flush(); err != nil {
329		return nil, err
330	}
331	go func() {
332		t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
333		err := t.loopy.run()
334		if err != nil {
335			errorf("transport: loopyWriter.run returning. Err: %v", err)
336		}
337		// If it's a connection error, let reader goroutine handle it
338		// since there might be data in the buffers.
339		if _, ok := err.(net.Error); !ok {
340			t.conn.Close()
341		}
342		close(t.writerDone)
343	}()
344	return t, nil
345}
346
347func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
348	// TODO(zhaoq): Handle uint32 overflow of Stream.id.
349	s := &Stream{
350		done:           make(chan struct{}),
351		method:         callHdr.Method,
352		sendCompress:   callHdr.SendCompress,
353		buf:            newRecvBuffer(),
354		headerChan:     make(chan struct{}),
355		contentSubtype: callHdr.ContentSubtype,
356	}
357	s.wq = newWriteQuota(defaultWriteQuota, s.done)
358	s.requestRead = func(n int) {
359		t.adjustWindow(s, uint32(n))
360	}
361	// The client side stream context should have exactly the same life cycle with the user provided context.
362	// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
363	// So we use the original context here instead of creating a copy.
364	s.ctx = ctx
365	s.trReader = &transportReader{
366		reader: &recvBufferReader{
367			ctx:     s.ctx,
368			ctxDone: s.ctx.Done(),
369			recv:    s.buf,
370			closeStream: func(err error) {
371				t.CloseStream(s, err)
372			},
373			freeBuffer: t.bufferPool.put,
374		},
375		windowHandler: func(n int) {
376			t.updateWindow(s, uint32(n))
377		},
378	}
379	return s
380}
381
382func (t *http2Client) getPeer() *peer.Peer {
383	pr := &peer.Peer{
384		Addr: t.remoteAddr,
385	}
386	// Attach Auth info if there is any.
387	if t.authInfo != nil {
388		pr.AuthInfo = t.authInfo
389	}
390	return pr
391}
392
393func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
394	aud := t.createAudience(callHdr)
395	authData, err := t.getTrAuthData(ctx, aud)
396	if err != nil {
397		return nil, err
398	}
399	callAuthData, err := t.getCallAuthData(ctx, aud, callHdr)
400	if err != nil {
401		return nil, err
402	}
403	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
404	// first and create a slice of that exact size.
405	// Make the slice of certain predictable size to reduce allocations made by append.
406	hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
407	hfLen += len(authData) + len(callAuthData)
408	headerFields := make([]hpack.HeaderField, 0, hfLen)
409	headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
410	headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
411	headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
412	headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
413	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
414	headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
415	headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
416	if callHdr.PreviousAttempts > 0 {
417		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
418	}
419
420	if callHdr.SendCompress != "" {
421		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
422	}
423	if dl, ok := ctx.Deadline(); ok {
424		// Send out timeout regardless its value. The server can detect timeout context by itself.
425		// TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
426		timeout := time.Until(dl)
427		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
428	}
429	for k, v := range authData {
430		headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
431	}
432	for k, v := range callAuthData {
433		headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
434	}
435	if b := stats.OutgoingTags(ctx); b != nil {
436		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
437	}
438	if b := stats.OutgoingTrace(ctx); b != nil {
439		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
440	}
441
442	if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
443		var k string
444		for k, vv := range md {
445			// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
446			if isReservedHeader(k) {
447				continue
448			}
449			for _, v := range vv {
450				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
451			}
452		}
453		for _, vv := range added {
454			for i, v := range vv {
455				if i%2 == 0 {
456					k = v
457					continue
458				}
459				// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
460				if isReservedHeader(k) {
461					continue
462				}
463				headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
464			}
465		}
466	}
467	if md, ok := t.md.(*metadata.MD); ok {
468		for k, vv := range *md {
469			if isReservedHeader(k) {
470				continue
471			}
472			for _, v := range vv {
473				headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
474			}
475		}
476	}
477	return headerFields, nil
478}
479
480func (t *http2Client) createAudience(callHdr *CallHdr) string {
481	// Create an audience string only if needed.
482	if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
483		return ""
484	}
485	// Construct URI required to get auth request metadata.
486	// Omit port if it is the default one.
487	host := strings.TrimSuffix(callHdr.Host, ":443")
488	pos := strings.LastIndex(callHdr.Method, "/")
489	if pos == -1 {
490		pos = len(callHdr.Method)
491	}
492	return "https://" + host + callHdr.Method[:pos]
493}
494
495func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
496	authData := map[string]string{}
497	for _, c := range t.perRPCCreds {
498		data, err := c.GetRequestMetadata(ctx, audience)
499		if err != nil {
500			if _, ok := status.FromError(err); ok {
501				return nil, err
502			}
503
504			return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
505		}
506		for k, v := range data {
507			// Capital header names are illegal in HTTP/2.
508			k = strings.ToLower(k)
509			authData[k] = v
510		}
511	}
512	return authData, nil
513}
514
515func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
516	callAuthData := map[string]string{}
517	// Check if credentials.PerRPCCredentials were provided via call options.
518	// Note: if these credentials are provided both via dial options and call
519	// options, then both sets of credentials will be applied.
520	if callCreds := callHdr.Creds; callCreds != nil {
521		if !t.isSecure && callCreds.RequireTransportSecurity() {
522			return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
523		}
524		data, err := callCreds.GetRequestMetadata(ctx, audience)
525		if err != nil {
526			return nil, status.Errorf(codes.Internal, "transport: %v", err)
527		}
528		for k, v := range data {
529			// Capital header names are illegal in HTTP/2
530			k = strings.ToLower(k)
531			callAuthData[k] = v
532		}
533	}
534	return callAuthData, nil
535}
536
537// NewStream creates a stream and registers it into the transport as "active"
538// streams.
539func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
540	ctx = peer.NewContext(ctx, t.getPeer())
541	headerFields, err := t.createHeaderFields(ctx, callHdr)
542	if err != nil {
543		return nil, err
544	}
545	s := t.newStream(ctx, callHdr)
546	cleanup := func(err error) {
547		if s.swapState(streamDone) == streamDone {
548			// If it was already done, return.
549			return
550		}
551		// The stream was unprocessed by the server.
552		atomic.StoreUint32(&s.unprocessed, 1)
553		s.write(recvMsg{err: err})
554		close(s.done)
555		// If headerChan isn't closed, then close it.
556		if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
557			close(s.headerChan)
558		}
559
560	}
561	hdr := &headerFrame{
562		hf:        headerFields,
563		endStream: false,
564		initStream: func(id uint32) (bool, error) {
565			t.mu.Lock()
566			if state := t.state; state != reachable {
567				t.mu.Unlock()
568				// Do a quick cleanup.
569				err := error(errStreamDrain)
570				if state == closing {
571					err = ErrConnClosing
572				}
573				cleanup(err)
574				return false, err
575			}
576			t.activeStreams[id] = s
577			if channelz.IsOn() {
578				atomic.AddInt64(&t.czData.streamsStarted, 1)
579				atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
580			}
581			var sendPing bool
582			// If the number of active streams change from 0 to 1, then check if keepalive
583			// has gone dormant. If so, wake it up.
584			if len(t.activeStreams) == 1 && t.keepaliveEnabled {
585				select {
586				case t.awakenKeepalive <- struct{}{}:
587					sendPing = true
588					// Fill the awakenKeepalive channel again as this channel must be
589					// kept non-writable except at the point that the keepalive()
590					// goroutine is waiting either to be awaken or shutdown.
591					t.awakenKeepalive <- struct{}{}
592				default:
593				}
594			}
595			t.mu.Unlock()
596			return sendPing, nil
597		},
598		onOrphaned: cleanup,
599		wq:         s.wq,
600	}
601	firstTry := true
602	var ch chan struct{}
603	checkForStreamQuota := func(it interface{}) bool {
604		if t.streamQuota <= 0 { // Can go negative if server decreases it.
605			if firstTry {
606				t.waitingStreams++
607			}
608			ch = t.streamsQuotaAvailable
609			return false
610		}
611		if !firstTry {
612			t.waitingStreams--
613		}
614		t.streamQuota--
615		h := it.(*headerFrame)
616		h.streamID = t.nextID
617		t.nextID += 2
618		s.id = h.streamID
619		s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
620		if t.streamQuota > 0 && t.waitingStreams > 0 {
621			select {
622			case t.streamsQuotaAvailable <- struct{}{}:
623			default:
624			}
625		}
626		return true
627	}
628	var hdrListSizeErr error
629	checkForHeaderListSize := func(it interface{}) bool {
630		if t.maxSendHeaderListSize == nil {
631			return true
632		}
633		hdrFrame := it.(*headerFrame)
634		var sz int64
635		for _, f := range hdrFrame.hf {
636			if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
637				hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
638				return false
639			}
640		}
641		return true
642	}
643	for {
644		success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
645			if !checkForStreamQuota(it) {
646				return false
647			}
648			if !checkForHeaderListSize(it) {
649				return false
650			}
651			return true
652		}, hdr)
653		if err != nil {
654			return nil, err
655		}
656		if success {
657			break
658		}
659		if hdrListSizeErr != nil {
660			return nil, hdrListSizeErr
661		}
662		firstTry = false
663		select {
664		case <-ch:
665		case <-s.ctx.Done():
666			return nil, ContextErr(s.ctx.Err())
667		case <-t.goAway:
668			return nil, errStreamDrain
669		case <-t.ctx.Done():
670			return nil, ErrConnClosing
671		}
672	}
673	if t.statsHandler != nil {
674		outHeader := &stats.OutHeader{
675			Client:      true,
676			FullMethod:  callHdr.Method,
677			RemoteAddr:  t.remoteAddr,
678			LocalAddr:   t.localAddr,
679			Compression: callHdr.SendCompress,
680		}
681		t.statsHandler.HandleRPC(s.ctx, outHeader)
682	}
683	return s, nil
684}
685
686// CloseStream clears the footprint of a stream when the stream is not needed any more.
687// This must not be executed in reader's goroutine.
688func (t *http2Client) CloseStream(s *Stream, err error) {
689	var (
690		rst     bool
691		rstCode http2.ErrCode
692	)
693	if err != nil {
694		rst = true
695		rstCode = http2.ErrCodeCancel
696	}
697	t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
698}
699
700func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
701	// Set stream status to done.
702	if s.swapState(streamDone) == streamDone {
703		// If it was already done, return.  If multiple closeStream calls
704		// happen simultaneously, wait for the first to finish.
705		<-s.done
706		return
707	}
708	// status and trailers can be updated here without any synchronization because the stream goroutine will
709	// only read it after it sees an io.EOF error from read or write and we'll write those errors
710	// only after updating this.
711	s.status = st
712	if len(mdata) > 0 {
713		s.trailer = mdata
714	}
715	if err != nil {
716		// This will unblock reads eventually.
717		s.write(recvMsg{err: err})
718	}
719	// If headerChan isn't closed, then close it.
720	if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
721		s.noHeaders = true
722		close(s.headerChan)
723	}
724	cleanup := &cleanupStream{
725		streamID: s.id,
726		onWrite: func() {
727			t.mu.Lock()
728			if t.activeStreams != nil {
729				delete(t.activeStreams, s.id)
730			}
731			t.mu.Unlock()
732			if channelz.IsOn() {
733				if eosReceived {
734					atomic.AddInt64(&t.czData.streamsSucceeded, 1)
735				} else {
736					atomic.AddInt64(&t.czData.streamsFailed, 1)
737				}
738			}
739		},
740		rst:     rst,
741		rstCode: rstCode,
742	}
743	addBackStreamQuota := func(interface{}) bool {
744		t.streamQuota++
745		if t.streamQuota > 0 && t.waitingStreams > 0 {
746			select {
747			case t.streamsQuotaAvailable <- struct{}{}:
748			default:
749			}
750		}
751		return true
752	}
753	t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
754	// This will unblock write.
755	close(s.done)
756}
757
758// Close kicks off the shutdown process of the transport. This should be called
759// only once on a transport. Once it is called, the transport should not be
760// accessed any more.
761//
762// This method blocks until the addrConn that initiated this transport is
763// re-connected. This happens because t.onClose() begins reconnect logic at the
764// addrConn level and blocks until the addrConn is successfully connected.
765func (t *http2Client) Close() error {
766	t.mu.Lock()
767	// Make sure we only Close once.
768	if t.state == closing {
769		t.mu.Unlock()
770		return nil
771	}
772	t.state = closing
773	streams := t.activeStreams
774	t.activeStreams = nil
775	t.mu.Unlock()
776	t.controlBuf.finish()
777	t.cancel()
778	err := t.conn.Close()
779	if channelz.IsOn() {
780		channelz.RemoveEntry(t.channelzID)
781	}
782	// Notify all active streams.
783	for _, s := range streams {
784		t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
785	}
786	if t.statsHandler != nil {
787		connEnd := &stats.ConnEnd{
788			Client: true,
789		}
790		t.statsHandler.HandleConn(t.ctx, connEnd)
791	}
792	t.onClose()
793	return err
794}
795
796// GracefulClose sets the state to draining, which prevents new streams from
797// being created and causes the transport to be closed when the last active
798// stream is closed.  If there are no active streams, the transport is closed
799// immediately.  This does nothing if the transport is already draining or
800// closing.
801func (t *http2Client) GracefulClose() {
802	t.mu.Lock()
803	// Make sure we move to draining only from active.
804	if t.state == draining || t.state == closing {
805		t.mu.Unlock()
806		return
807	}
808	t.state = draining
809	active := len(t.activeStreams)
810	t.mu.Unlock()
811	if active == 0 {
812		t.Close()
813		return
814	}
815	t.controlBuf.put(&incomingGoAway{})
816}
817
818// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
819// should proceed only if Write returns nil.
820func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
821	if opts.Last {
822		// If it's the last message, update stream state.
823		if !s.compareAndSwapState(streamActive, streamWriteDone) {
824			return errStreamDone
825		}
826	} else if s.getState() != streamActive {
827		return errStreamDone
828	}
829	df := &dataFrame{
830		streamID:  s.id,
831		endStream: opts.Last,
832	}
833	if hdr != nil || data != nil { // If it's not an empty data frame.
834		// Add some data to grpc message header so that we can equally
835		// distribute bytes across frames.
836		emptyLen := http2MaxFrameLen - len(hdr)
837		if emptyLen > len(data) {
838			emptyLen = len(data)
839		}
840		hdr = append(hdr, data[:emptyLen]...)
841		data = data[emptyLen:]
842		df.h, df.d = hdr, data
843		// TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
844		if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
845			return err
846		}
847	}
848	return t.controlBuf.put(df)
849}
850
851func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
852	t.mu.Lock()
853	defer t.mu.Unlock()
854	s, ok := t.activeStreams[f.Header().StreamID]
855	return s, ok
856}
857
858// adjustWindow sends out extra window update over the initial window size
859// of stream if the application is requesting data larger in size than
860// the window.
861func (t *http2Client) adjustWindow(s *Stream, n uint32) {
862	if w := s.fc.maybeAdjust(n); w > 0 {
863		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
864	}
865}
866
867// updateWindow adjusts the inbound quota for the stream.
868// Window updates will be sent out when the cumulative quota
869// exceeds the corresponding threshold.
870func (t *http2Client) updateWindow(s *Stream, n uint32) {
871	if w := s.fc.onRead(n); w > 0 {
872		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
873	}
874}
875
876// updateFlowControl updates the incoming flow control windows
877// for the transport and the stream based on the current bdp
878// estimation.
879func (t *http2Client) updateFlowControl(n uint32) {
880	t.mu.Lock()
881	for _, s := range t.activeStreams {
882		s.fc.newLimit(n)
883	}
884	t.mu.Unlock()
885	updateIWS := func(interface{}) bool {
886		t.initialWindowSize = int32(n)
887		return true
888	}
889	t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
890	t.controlBuf.put(&outgoingSettings{
891		ss: []http2.Setting{
892			{
893				ID:  http2.SettingInitialWindowSize,
894				Val: n,
895			},
896		},
897	})
898}
899
900func (t *http2Client) handleData(f *http2.DataFrame) {
901	size := f.Header().Length
902	var sendBDPPing bool
903	if t.bdpEst != nil {
904		sendBDPPing = t.bdpEst.add(size)
905	}
906	// Decouple connection's flow control from application's read.
907	// An update on connection's flow control should not depend on
908	// whether user application has read the data or not. Such a
909	// restriction is already imposed on the stream's flow control,
910	// and therefore the sender will be blocked anyways.
911	// Decoupling the connection flow control will prevent other
912	// active(fast) streams from starving in presence of slow or
913	// inactive streams.
914	//
915	if w := t.fc.onData(size); w > 0 {
916		t.controlBuf.put(&outgoingWindowUpdate{
917			streamID:  0,
918			increment: w,
919		})
920	}
921	if sendBDPPing {
922		// Avoid excessive ping detection (e.g. in an L7 proxy)
923		// by sending a window update prior to the BDP ping.
924
925		if w := t.fc.reset(); w > 0 {
926			t.controlBuf.put(&outgoingWindowUpdate{
927				streamID:  0,
928				increment: w,
929			})
930		}
931
932		t.controlBuf.put(bdpPing)
933	}
934	// Select the right stream to dispatch.
935	s, ok := t.getStream(f)
936	if !ok {
937		return
938	}
939	if size > 0 {
940		if err := s.fc.onData(size); err != nil {
941			t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
942			return
943		}
944		if f.Header().Flags.Has(http2.FlagDataPadded) {
945			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
946				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
947			}
948		}
949		// TODO(bradfitz, zhaoq): A copy is required here because there is no
950		// guarantee f.Data() is consumed before the arrival of next frame.
951		// Can this copy be eliminated?
952		if len(f.Data()) > 0 {
953			buffer := t.bufferPool.get()
954			buffer.Reset()
955			buffer.Write(f.Data())
956			s.write(recvMsg{buffer: buffer})
957		}
958	}
959	// The server has closed the stream without sending trailers.  Record that
960	// the read direction is closed, and set the status appropriately.
961	if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
962		t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
963	}
964}
965
966func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
967	s, ok := t.getStream(f)
968	if !ok {
969		return
970	}
971	if f.ErrCode == http2.ErrCodeRefusedStream {
972		// The stream was unprocessed by the server.
973		atomic.StoreUint32(&s.unprocessed, 1)
974	}
975	statusCode, ok := http2ErrConvTab[f.ErrCode]
976	if !ok {
977		warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
978		statusCode = codes.Unknown
979	}
980	if statusCode == codes.Canceled {
981		// Our deadline was already exceeded, and that was likely the cause of
982		// this cancelation.  Alter the status code accordingly.
983		if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) {
984			statusCode = codes.DeadlineExceeded
985		}
986	}
987	t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
988}
989
990func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
991	if f.IsAck() {
992		return
993	}
994	var maxStreams *uint32
995	var ss []http2.Setting
996	var updateFuncs []func()
997	f.ForeachSetting(func(s http2.Setting) error {
998		switch s.ID {
999		case http2.SettingMaxConcurrentStreams:
1000			maxStreams = new(uint32)
1001			*maxStreams = s.Val
1002		case http2.SettingMaxHeaderListSize:
1003			updateFuncs = append(updateFuncs, func() {
1004				t.maxSendHeaderListSize = new(uint32)
1005				*t.maxSendHeaderListSize = s.Val
1006			})
1007		default:
1008			ss = append(ss, s)
1009		}
1010		return nil
1011	})
1012	if isFirst && maxStreams == nil {
1013		maxStreams = new(uint32)
1014		*maxStreams = math.MaxUint32
1015	}
1016	sf := &incomingSettings{
1017		ss: ss,
1018	}
1019	if maxStreams != nil {
1020		updateStreamQuota := func() {
1021			delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
1022			t.maxConcurrentStreams = *maxStreams
1023			t.streamQuota += delta
1024			if delta > 0 && t.waitingStreams > 0 {
1025				close(t.streamsQuotaAvailable) // wake all of them up.
1026				t.streamsQuotaAvailable = make(chan struct{}, 1)
1027			}
1028		}
1029		updateFuncs = append(updateFuncs, updateStreamQuota)
1030	}
1031	t.controlBuf.executeAndPut(func(interface{}) bool {
1032		for _, f := range updateFuncs {
1033			f()
1034		}
1035		return true
1036	}, sf)
1037}
1038
1039func (t *http2Client) handlePing(f *http2.PingFrame) {
1040	if f.IsAck() {
1041		// Maybe it's a BDP ping.
1042		if t.bdpEst != nil {
1043			t.bdpEst.calculate(f.Data)
1044		}
1045		return
1046	}
1047	pingAck := &ping{ack: true}
1048	copy(pingAck.data[:], f.Data[:])
1049	t.controlBuf.put(pingAck)
1050}
1051
1052func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
1053	t.mu.Lock()
1054	if t.state == closing {
1055		t.mu.Unlock()
1056		return
1057	}
1058	if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
1059		infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
1060	}
1061	id := f.LastStreamID
1062	if id > 0 && id%2 != 1 {
1063		t.mu.Unlock()
1064		t.Close()
1065		return
1066	}
1067	// A client can receive multiple GoAways from the server (see
1068	// https://github.com/grpc/grpc-go/issues/1387).  The idea is that the first
1069	// GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
1070	// sent after an RTT delay with the ID of the last stream the server will
1071	// process.
1072	//
1073	// Therefore, when we get the first GoAway we don't necessarily close any
1074	// streams. While in case of second GoAway we close all streams created after
1075	// the GoAwayId. This way streams that were in-flight while the GoAway from
1076	// server was being sent don't get killed.
1077	select {
1078	case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
1079		// If there are multiple GoAways the first one should always have an ID greater than the following ones.
1080		if id > t.prevGoAwayID {
1081			t.mu.Unlock()
1082			t.Close()
1083			return
1084		}
1085	default:
1086		t.setGoAwayReason(f)
1087		close(t.goAway)
1088		t.state = draining
1089		t.controlBuf.put(&incomingGoAway{})
1090
1091		// This has to be a new goroutine because we're still using the current goroutine to read in the transport.
1092		t.onGoAway(t.goAwayReason)
1093	}
1094	// All streams with IDs greater than the GoAwayId
1095	// and smaller than the previous GoAway ID should be killed.
1096	upperLimit := t.prevGoAwayID
1097	if upperLimit == 0 { // This is the first GoAway Frame.
1098		upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1099	}
1100	for streamID, stream := range t.activeStreams {
1101		if streamID > id && streamID <= upperLimit {
1102			// The stream was unprocessed by the server.
1103			atomic.StoreUint32(&stream.unprocessed, 1)
1104			t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
1105		}
1106	}
1107	t.prevGoAwayID = id
1108	active := len(t.activeStreams)
1109	t.mu.Unlock()
1110	if active == 0 {
1111		t.Close()
1112	}
1113}
1114
1115// setGoAwayReason sets the value of t.goAwayReason based
1116// on the GoAway frame received.
1117// It expects a lock on transport's mutext to be held by
1118// the caller.
1119func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1120	t.goAwayReason = GoAwayNoReason
1121	switch f.ErrCode {
1122	case http2.ErrCodeEnhanceYourCalm:
1123		if string(f.DebugData()) == "too_many_pings" {
1124			t.goAwayReason = GoAwayTooManyPings
1125		}
1126	}
1127}
1128
1129func (t *http2Client) GetGoAwayReason() GoAwayReason {
1130	t.mu.Lock()
1131	defer t.mu.Unlock()
1132	return t.goAwayReason
1133}
1134
1135func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1136	t.controlBuf.put(&incomingWindowUpdate{
1137		streamID:  f.Header().StreamID,
1138		increment: f.Increment,
1139	})
1140}
1141
1142// operateHeaders takes action on the decoded headers.
1143func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1144	s, ok := t.getStream(frame)
1145	if !ok {
1146		return
1147	}
1148	endStream := frame.StreamEnded()
1149	atomic.StoreUint32(&s.bytesReceived, 1)
1150	initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
1151
1152	if !initialHeader && !endStream {
1153		// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
1154		st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
1155		t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
1156		return
1157	}
1158
1159	state := &decodeState{}
1160	// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
1161	state.data.isGRPC = !initialHeader
1162	if err := state.decodeHeader(frame); err != nil {
1163		t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
1164		return
1165	}
1166
1167	isHeader := false
1168	defer func() {
1169		if t.statsHandler != nil {
1170			if isHeader {
1171				inHeader := &stats.InHeader{
1172					Client:     true,
1173					WireLength: int(frame.Header().Length),
1174				}
1175				t.statsHandler.HandleRPC(s.ctx, inHeader)
1176			} else {
1177				inTrailer := &stats.InTrailer{
1178					Client:     true,
1179					WireLength: int(frame.Header().Length),
1180				}
1181				t.statsHandler.HandleRPC(s.ctx, inTrailer)
1182			}
1183		}
1184	}()
1185
1186	// If headerChan hasn't been closed yet
1187	if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
1188		if !endStream {
1189			// HEADERS frame block carries a Response-Headers.
1190			isHeader = true
1191			// These values can be set without any synchronization because
1192			// stream goroutine will read it only after seeing a closed
1193			// headerChan which we'll close after setting this.
1194			s.recvCompress = state.data.encoding
1195			if len(state.data.mdata) > 0 {
1196				s.header = state.data.mdata
1197			}
1198		} else {
1199			// HEADERS frame block carries a Trailers-Only.
1200			s.noHeaders = true
1201		}
1202		close(s.headerChan)
1203	}
1204
1205	if !endStream {
1206		return
1207	}
1208
1209	// if client received END_STREAM from server while stream was still active, send RST_STREAM
1210	rst := s.getState() == streamActive
1211	t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
1212}
1213
1214// reader runs as a separate goroutine in charge of reading data from network
1215// connection.
1216//
1217// TODO(zhaoq): currently one reader per transport. Investigate whether this is
1218// optimal.
1219// TODO(zhaoq): Check the validity of the incoming frame sequence.
1220func (t *http2Client) reader() {
1221	defer close(t.readerDone)
1222	// Check the validity of server preface.
1223	frame, err := t.framer.fr.ReadFrame()
1224	if err != nil {
1225		t.Close() // this kicks off resetTransport, so must be last before return
1226		return
1227	}
1228	t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
1229	if t.keepaliveEnabled {
1230		atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1231	}
1232	sf, ok := frame.(*http2.SettingsFrame)
1233	if !ok {
1234		t.Close() // this kicks off resetTransport, so must be last before return
1235		return
1236	}
1237	t.onPrefaceReceipt()
1238	t.handleSettings(sf, true)
1239
1240	// loop to keep reading incoming messages on this transport.
1241	for {
1242		frame, err := t.framer.fr.ReadFrame()
1243		if t.keepaliveEnabled {
1244			atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1245		}
1246		if err != nil {
1247			// Abort an active stream if the http2.Framer returns a
1248			// http2.StreamError. This can happen only if the server's response
1249			// is malformed http2.
1250			if se, ok := err.(http2.StreamError); ok {
1251				t.mu.Lock()
1252				s := t.activeStreams[se.StreamID]
1253				t.mu.Unlock()
1254				if s != nil {
1255					// use error detail to provide better err message
1256					code := http2ErrConvTab[se.Code]
1257					msg := t.framer.fr.ErrorDetail().Error()
1258					t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
1259				}
1260				continue
1261			} else {
1262				// Transport error.
1263				t.Close()
1264				return
1265			}
1266		}
1267		switch frame := frame.(type) {
1268		case *http2.MetaHeadersFrame:
1269			t.operateHeaders(frame)
1270		case *http2.DataFrame:
1271			t.handleData(frame)
1272		case *http2.RSTStreamFrame:
1273			t.handleRSTStream(frame)
1274		case *http2.SettingsFrame:
1275			t.handleSettings(frame, false)
1276		case *http2.PingFrame:
1277			t.handlePing(frame)
1278		case *http2.GoAwayFrame:
1279			t.handleGoAway(frame)
1280		case *http2.WindowUpdateFrame:
1281			t.handleWindowUpdate(frame)
1282		default:
1283			errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1284		}
1285	}
1286}
1287
1288// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
1289func (t *http2Client) keepalive() {
1290	p := &ping{data: [8]byte{}}
1291	timer := time.NewTimer(t.kp.Time)
1292	for {
1293		select {
1294		case <-timer.C:
1295			if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1296				timer.Reset(t.kp.Time)
1297				continue
1298			}
1299			// Check if keepalive should go dormant.
1300			t.mu.Lock()
1301			if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1302				// Make awakenKeepalive writable.
1303				<-t.awakenKeepalive
1304				t.mu.Unlock()
1305				select {
1306				case <-t.awakenKeepalive:
1307					// If the control gets here a ping has been sent
1308					// need to reset the timer with keepalive.Timeout.
1309				case <-t.ctx.Done():
1310					return
1311				}
1312			} else {
1313				t.mu.Unlock()
1314				if channelz.IsOn() {
1315					atomic.AddInt64(&t.czData.kpCount, 1)
1316				}
1317				// Send ping.
1318				t.controlBuf.put(p)
1319			}
1320
1321			// By the time control gets here a ping has been sent one way or the other.
1322			timer.Reset(t.kp.Timeout)
1323			select {
1324			case <-timer.C:
1325				if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1326					timer.Reset(t.kp.Time)
1327					continue
1328				}
1329				t.Close()
1330				return
1331			case <-t.ctx.Done():
1332				if !timer.Stop() {
1333					<-timer.C
1334				}
1335				return
1336			}
1337		case <-t.ctx.Done():
1338			if !timer.Stop() {
1339				<-timer.C
1340			}
1341			return
1342		}
1343	}
1344}
1345
1346func (t *http2Client) Error() <-chan struct{} {
1347	return t.ctx.Done()
1348}
1349
1350func (t *http2Client) GoAway() <-chan struct{} {
1351	return t.goAway
1352}
1353
1354func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
1355	s := channelz.SocketInternalMetric{
1356		StreamsStarted:                  atomic.LoadInt64(&t.czData.streamsStarted),
1357		StreamsSucceeded:                atomic.LoadInt64(&t.czData.streamsSucceeded),
1358		StreamsFailed:                   atomic.LoadInt64(&t.czData.streamsFailed),
1359		MessagesSent:                    atomic.LoadInt64(&t.czData.msgSent),
1360		MessagesReceived:                atomic.LoadInt64(&t.czData.msgRecv),
1361		KeepAlivesSent:                  atomic.LoadInt64(&t.czData.kpCount),
1362		LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1363		LastMessageSentTimestamp:        time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1364		LastMessageReceivedTimestamp:    time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1365		LocalFlowControlWindow:          int64(t.fc.getSize()),
1366		SocketOptions:                   channelz.GetSocketOption(t.conn),
1367		LocalAddr:                       t.localAddr,
1368		RemoteAddr:                      t.remoteAddr,
1369		// RemoteName :
1370	}
1371	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1372		s.Security = au.GetSecurityValue()
1373	}
1374	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1375	return &s
1376}
1377
1378func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
1379
1380func (t *http2Client) IncrMsgSent() {
1381	atomic.AddInt64(&t.czData.msgSent, 1)
1382	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1383}
1384
1385func (t *http2Client) IncrMsgRecv() {
1386	atomic.AddInt64(&t.czData.msgRecv, 1)
1387	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1388}
1389
1390func (t *http2Client) getOutFlowWindow() int64 {
1391	resp := make(chan uint32, 1)
1392	timer := time.NewTimer(time.Second)
1393	defer timer.Stop()
1394	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1395	select {
1396	case sz := <-resp:
1397		return int64(sz)
1398	case <-t.ctxDone:
1399		return -1
1400	case <-timer.C:
1401		return -2
1402	}
1403}
1404