1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"context"
24	"errors"
25	"fmt"
26	"io"
27	"math"
28	"net"
29	"strconv"
30	"sync"
31	"sync/atomic"
32	"time"
33
34	"github.com/golang/protobuf/proto"
35	"golang.org/x/net/http2"
36	"golang.org/x/net/http2/hpack"
37
38	spb "google.golang.org/genproto/googleapis/rpc/status"
39	"google.golang.org/grpc/codes"
40	"google.golang.org/grpc/credentials"
41	"google.golang.org/grpc/grpclog"
42	"google.golang.org/grpc/internal"
43	"google.golang.org/grpc/internal/channelz"
44	"google.golang.org/grpc/internal/grpcrand"
45	"google.golang.org/grpc/keepalive"
46	"google.golang.org/grpc/metadata"
47	"google.golang.org/grpc/peer"
48	"google.golang.org/grpc/stats"
49	"google.golang.org/grpc/status"
50	"google.golang.org/grpc/tap"
51)
52
53var (
54	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
55	// the stream's state.
56	ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
57	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
58	// than the limit set by peer.
59	ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
60	// statusRawProto is a function to get to the raw status proto wrapped in a
61	// status.Status without a proto.Clone().
62	statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
63)
64
65// http2Server implements the ServerTransport interface with HTTP2.
66type http2Server struct {
67	ctx         context.Context
68	ctxDone     <-chan struct{} // Cache the context.Done() chan
69	cancel      context.CancelFunc
70	conn        net.Conn
71	loopy       *loopyWriter
72	readerDone  chan struct{} // sync point to enable testing.
73	writerDone  chan struct{} // sync point to enable testing.
74	remoteAddr  net.Addr
75	localAddr   net.Addr
76	maxStreamID uint32               // max stream ID ever seen
77	authInfo    credentials.AuthInfo // auth info about the connection
78	inTapHandle tap.ServerInHandle
79	framer      *framer
80	// The max number of concurrent streams.
81	maxStreams uint32
82	// controlBuf delivers all the control related tasks (e.g., window
83	// updates, reset streams, and various settings) to the controller.
84	controlBuf *controlBuffer
85	fc         *trInFlow
86	stats      stats.Handler
87	// Flag to keep track of reading activity on transport.
88	// 1 is true and 0 is false.
89	activity uint32 // Accessed atomically.
90	// Keepalive and max-age parameters for the server.
91	kp keepalive.ServerParameters
92
93	// Keepalive enforcement policy.
94	kep keepalive.EnforcementPolicy
95	// The time instance last ping was received.
96	lastPingAt time.Time
97	// Number of times the client has violated keepalive ping policy so far.
98	pingStrikes uint8
99	// Flag to signify that number of ping strikes should be reset to 0.
100	// This is set whenever data or header frames are sent.
101	// 1 means yes.
102	resetPingStrikes      uint32 // Accessed atomically.
103	initialWindowSize     int32
104	bdpEst                *bdpEstimator
105	maxSendHeaderListSize *uint32
106
107	mu sync.Mutex // guard the following
108
109	// drainChan is initialized when drain(...) is called the first time.
110	// After which the server writes out the first GoAway(with ID 2^31-1) frame.
111	// Then an independent goroutine will be launched to later send the second GoAway.
112	// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
113	// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
114	// already underway.
115	drainChan     chan struct{}
116	state         transportState
117	activeStreams map[uint32]*Stream
118	// idle is the time instant when the connection went idle.
119	// This is either the beginning of the connection or when the number of
120	// RPCs go down to 0.
121	// When the connection is busy, this value is set to 0.
122	idle time.Time
123
124	// Fields below are for channelz metric collection.
125	channelzID int64 // channelz unique identification number
126	czData     *channelzData
127	bufferPool *bufferPool
128}
129
130// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
131// returned if something goes wrong.
132func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
133	writeBufSize := config.WriteBufferSize
134	readBufSize := config.ReadBufferSize
135	maxHeaderListSize := defaultServerMaxHeaderListSize
136	if config.MaxHeaderListSize != nil {
137		maxHeaderListSize = *config.MaxHeaderListSize
138	}
139	framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
140	// Send initial settings as connection preface to client.
141	var isettings []http2.Setting
142	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
143	// permitted in the HTTP2 spec.
144	maxStreams := config.MaxStreams
145	if maxStreams == 0 {
146		maxStreams = math.MaxUint32
147	} else {
148		isettings = append(isettings, http2.Setting{
149			ID:  http2.SettingMaxConcurrentStreams,
150			Val: maxStreams,
151		})
152	}
153	dynamicWindow := true
154	iwz := int32(initialWindowSize)
155	if config.InitialWindowSize >= defaultWindowSize {
156		iwz = config.InitialWindowSize
157		dynamicWindow = false
158	}
159	icwz := int32(initialWindowSize)
160	if config.InitialConnWindowSize >= defaultWindowSize {
161		icwz = config.InitialConnWindowSize
162		dynamicWindow = false
163	}
164	if iwz != defaultWindowSize {
165		isettings = append(isettings, http2.Setting{
166			ID:  http2.SettingInitialWindowSize,
167			Val: uint32(iwz)})
168	}
169	if config.MaxHeaderListSize != nil {
170		isettings = append(isettings, http2.Setting{
171			ID:  http2.SettingMaxHeaderListSize,
172			Val: *config.MaxHeaderListSize,
173		})
174	}
175	if err := framer.fr.WriteSettings(isettings...); err != nil {
176		return nil, connectionErrorf(false, err, "transport: %v", err)
177	}
178	// Adjust the connection flow control window if needed.
179	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
180		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
181			return nil, connectionErrorf(false, err, "transport: %v", err)
182		}
183	}
184	kp := config.KeepaliveParams
185	if kp.MaxConnectionIdle == 0 {
186		kp.MaxConnectionIdle = defaultMaxConnectionIdle
187	}
188	if kp.MaxConnectionAge == 0 {
189		kp.MaxConnectionAge = defaultMaxConnectionAge
190	}
191	// Add a jitter to MaxConnectionAge.
192	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
193	if kp.MaxConnectionAgeGrace == 0 {
194		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
195	}
196	if kp.Time == 0 {
197		kp.Time = defaultServerKeepaliveTime
198	}
199	if kp.Timeout == 0 {
200		kp.Timeout = defaultServerKeepaliveTimeout
201	}
202	kep := config.KeepalivePolicy
203	if kep.MinTime == 0 {
204		kep.MinTime = defaultKeepalivePolicyMinTime
205	}
206	ctx, cancel := context.WithCancel(context.Background())
207	t := &http2Server{
208		ctx:               ctx,
209		cancel:            cancel,
210		ctxDone:           ctx.Done(),
211		conn:              conn,
212		remoteAddr:        conn.RemoteAddr(),
213		localAddr:         conn.LocalAddr(),
214		authInfo:          config.AuthInfo,
215		framer:            framer,
216		readerDone:        make(chan struct{}),
217		writerDone:        make(chan struct{}),
218		maxStreams:        maxStreams,
219		inTapHandle:       config.InTapHandle,
220		fc:                &trInFlow{limit: uint32(icwz)},
221		state:             reachable,
222		activeStreams:     make(map[uint32]*Stream),
223		stats:             config.StatsHandler,
224		kp:                kp,
225		idle:              time.Now(),
226		kep:               kep,
227		initialWindowSize: iwz,
228		czData:            new(channelzData),
229		bufferPool:        newBufferPool(),
230	}
231	t.controlBuf = newControlBuffer(t.ctxDone)
232	if dynamicWindow {
233		t.bdpEst = &bdpEstimator{
234			bdp:               initialWindowSize,
235			updateFlowControl: t.updateFlowControl,
236		}
237	}
238	if t.stats != nil {
239		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
240			RemoteAddr: t.remoteAddr,
241			LocalAddr:  t.localAddr,
242		})
243		connBegin := &stats.ConnBegin{}
244		t.stats.HandleConn(t.ctx, connBegin)
245	}
246	if channelz.IsOn() {
247		t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
248	}
249	t.framer.writer.Flush()
250
251	defer func() {
252		if err != nil {
253			t.Close()
254		}
255	}()
256
257	// Check the validity of client preface.
258	preface := make([]byte, len(clientPreface))
259	if _, err := io.ReadFull(t.conn, preface); err != nil {
260		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
261	}
262	if !bytes.Equal(preface, clientPreface) {
263		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
264	}
265
266	frame, err := t.framer.fr.ReadFrame()
267	if err == io.EOF || err == io.ErrUnexpectedEOF {
268		return nil, err
269	}
270	if err != nil {
271		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
272	}
273	atomic.StoreUint32(&t.activity, 1)
274	sf, ok := frame.(*http2.SettingsFrame)
275	if !ok {
276		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
277	}
278	t.handleSettings(sf)
279
280	go func() {
281		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
282		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
283		if err := t.loopy.run(); err != nil {
284			errorf("transport: loopyWriter.run returning. Err: %v", err)
285		}
286		t.conn.Close()
287		close(t.writerDone)
288	}()
289	go t.keepalive()
290	return t, nil
291}
292
293// operateHeader takes action on the decoded headers.
294func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
295	streamID := frame.Header().StreamID
296	state := &decodeState{
297		serverSide: true,
298	}
299	if err := state.decodeHeader(frame); err != nil {
300		if se, ok := status.FromError(err); ok {
301			t.controlBuf.put(&cleanupStream{
302				streamID: streamID,
303				rst:      true,
304				rstCode:  statusCodeConvTab[se.Code()],
305				onWrite:  func() {},
306			})
307		}
308		return false
309	}
310
311	buf := newRecvBuffer()
312	s := &Stream{
313		id:             streamID,
314		st:             t,
315		buf:            buf,
316		fc:             &inFlow{limit: uint32(t.initialWindowSize)},
317		recvCompress:   state.data.encoding,
318		method:         state.data.method,
319		contentSubtype: state.data.contentSubtype,
320	}
321	if frame.StreamEnded() {
322		// s is just created by the caller. No lock needed.
323		s.state = streamReadDone
324	}
325	if state.data.timeoutSet {
326		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
327	} else {
328		s.ctx, s.cancel = context.WithCancel(t.ctx)
329	}
330	pr := &peer.Peer{
331		Addr: t.remoteAddr,
332	}
333	// Attach Auth info if there is any.
334	if t.authInfo != nil {
335		pr.AuthInfo = t.authInfo
336	}
337	s.ctx = peer.NewContext(s.ctx, pr)
338	// Attach the received metadata to the context.
339	if len(state.data.mdata) > 0 {
340		s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
341	}
342	if state.data.statsTags != nil {
343		s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
344	}
345	if state.data.statsTrace != nil {
346		s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
347	}
348	if t.inTapHandle != nil {
349		var err error
350		info := &tap.Info{
351			FullMethodName: state.data.method,
352		}
353		s.ctx, err = t.inTapHandle(s.ctx, info)
354		if err != nil {
355			warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
356			t.controlBuf.put(&cleanupStream{
357				streamID: s.id,
358				rst:      true,
359				rstCode:  http2.ErrCodeRefusedStream,
360				onWrite:  func() {},
361			})
362			return false
363		}
364	}
365	t.mu.Lock()
366	if t.state != reachable {
367		t.mu.Unlock()
368		return false
369	}
370	if uint32(len(t.activeStreams)) >= t.maxStreams {
371		t.mu.Unlock()
372		t.controlBuf.put(&cleanupStream{
373			streamID: streamID,
374			rst:      true,
375			rstCode:  http2.ErrCodeRefusedStream,
376			onWrite:  func() {},
377		})
378		return false
379	}
380	if streamID%2 != 1 || streamID <= t.maxStreamID {
381		t.mu.Unlock()
382		// illegal gRPC stream id.
383		errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
384		return true
385	}
386	t.maxStreamID = streamID
387	t.activeStreams[streamID] = s
388	if len(t.activeStreams) == 1 {
389		t.idle = time.Time{}
390	}
391	t.mu.Unlock()
392	if channelz.IsOn() {
393		atomic.AddInt64(&t.czData.streamsStarted, 1)
394		atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
395	}
396	s.requestRead = func(n int) {
397		t.adjustWindow(s, uint32(n))
398	}
399	s.ctx = traceCtx(s.ctx, s.method)
400	if t.stats != nil {
401		s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
402		inHeader := &stats.InHeader{
403			FullMethod:  s.method,
404			RemoteAddr:  t.remoteAddr,
405			LocalAddr:   t.localAddr,
406			Compression: s.recvCompress,
407			WireLength:  int(frame.Header().Length),
408		}
409		t.stats.HandleRPC(s.ctx, inHeader)
410	}
411	s.ctxDone = s.ctx.Done()
412	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
413	s.trReader = &transportReader{
414		reader: &recvBufferReader{
415			ctx:        s.ctx,
416			ctxDone:    s.ctxDone,
417			recv:       s.buf,
418			freeBuffer: t.bufferPool.put,
419		},
420		windowHandler: func(n int) {
421			t.updateWindow(s, uint32(n))
422		},
423	}
424	// Register the stream with loopy.
425	t.controlBuf.put(&registerStream{
426		streamID: s.id,
427		wq:       s.wq,
428	})
429	handle(s)
430	return false
431}
432
433// HandleStreams receives incoming streams using the given handler. This is
434// typically run in a separate goroutine.
435// traceCtx attaches trace to ctx and returns the new context.
436func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
437	defer close(t.readerDone)
438	for {
439		t.controlBuf.throttle()
440		frame, err := t.framer.fr.ReadFrame()
441		atomic.StoreUint32(&t.activity, 1)
442		if err != nil {
443			if se, ok := err.(http2.StreamError); ok {
444				warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
445				t.mu.Lock()
446				s := t.activeStreams[se.StreamID]
447				t.mu.Unlock()
448				if s != nil {
449					t.closeStream(s, true, se.Code, false)
450				} else {
451					t.controlBuf.put(&cleanupStream{
452						streamID: se.StreamID,
453						rst:      true,
454						rstCode:  se.Code,
455						onWrite:  func() {},
456					})
457				}
458				continue
459			}
460			if err == io.EOF || err == io.ErrUnexpectedEOF {
461				t.Close()
462				return
463			}
464			warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
465			t.Close()
466			return
467		}
468		switch frame := frame.(type) {
469		case *http2.MetaHeadersFrame:
470			if t.operateHeaders(frame, handle, traceCtx) {
471				t.Close()
472				break
473			}
474		case *http2.DataFrame:
475			t.handleData(frame)
476		case *http2.RSTStreamFrame:
477			t.handleRSTStream(frame)
478		case *http2.SettingsFrame:
479			t.handleSettings(frame)
480		case *http2.PingFrame:
481			t.handlePing(frame)
482		case *http2.WindowUpdateFrame:
483			t.handleWindowUpdate(frame)
484		case *http2.GoAwayFrame:
485			// TODO: Handle GoAway from the client appropriately.
486		default:
487			errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
488		}
489	}
490}
491
492func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
493	t.mu.Lock()
494	defer t.mu.Unlock()
495	if t.activeStreams == nil {
496		// The transport is closing.
497		return nil, false
498	}
499	s, ok := t.activeStreams[f.Header().StreamID]
500	if !ok {
501		// The stream is already done.
502		return nil, false
503	}
504	return s, true
505}
506
507// adjustWindow sends out extra window update over the initial window size
508// of stream if the application is requesting data larger in size than
509// the window.
510func (t *http2Server) adjustWindow(s *Stream, n uint32) {
511	if w := s.fc.maybeAdjust(n); w > 0 {
512		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
513	}
514
515}
516
517// updateWindow adjusts the inbound quota for the stream and the transport.
518// Window updates will deliver to the controller for sending when
519// the cumulative quota exceeds the corresponding threshold.
520func (t *http2Server) updateWindow(s *Stream, n uint32) {
521	if w := s.fc.onRead(n); w > 0 {
522		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
523			increment: w,
524		})
525	}
526}
527
528// updateFlowControl updates the incoming flow control windows
529// for the transport and the stream based on the current bdp
530// estimation.
531func (t *http2Server) updateFlowControl(n uint32) {
532	t.mu.Lock()
533	for _, s := range t.activeStreams {
534		s.fc.newLimit(n)
535	}
536	t.initialWindowSize = int32(n)
537	t.mu.Unlock()
538	t.controlBuf.put(&outgoingWindowUpdate{
539		streamID:  0,
540		increment: t.fc.newLimit(n),
541	})
542	t.controlBuf.put(&outgoingSettings{
543		ss: []http2.Setting{
544			{
545				ID:  http2.SettingInitialWindowSize,
546				Val: n,
547			},
548		},
549	})
550
551}
552
553func (t *http2Server) handleData(f *http2.DataFrame) {
554	size := f.Header().Length
555	var sendBDPPing bool
556	if t.bdpEst != nil {
557		sendBDPPing = t.bdpEst.add(size)
558	}
559	// Decouple connection's flow control from application's read.
560	// An update on connection's flow control should not depend on
561	// whether user application has read the data or not. Such a
562	// restriction is already imposed on the stream's flow control,
563	// and therefore the sender will be blocked anyways.
564	// Decoupling the connection flow control will prevent other
565	// active(fast) streams from starving in presence of slow or
566	// inactive streams.
567	if w := t.fc.onData(size); w > 0 {
568		t.controlBuf.put(&outgoingWindowUpdate{
569			streamID:  0,
570			increment: w,
571		})
572	}
573	if sendBDPPing {
574		// Avoid excessive ping detection (e.g. in an L7 proxy)
575		// by sending a window update prior to the BDP ping.
576		if w := t.fc.reset(); w > 0 {
577			t.controlBuf.put(&outgoingWindowUpdate{
578				streamID:  0,
579				increment: w,
580			})
581		}
582		t.controlBuf.put(bdpPing)
583	}
584	// Select the right stream to dispatch.
585	s, ok := t.getStream(f)
586	if !ok {
587		return
588	}
589	if size > 0 {
590		if err := s.fc.onData(size); err != nil {
591			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
592			return
593		}
594		if f.Header().Flags.Has(http2.FlagDataPadded) {
595			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
596				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
597			}
598		}
599		// TODO(bradfitz, zhaoq): A copy is required here because there is no
600		// guarantee f.Data() is consumed before the arrival of next frame.
601		// Can this copy be eliminated?
602		if len(f.Data()) > 0 {
603			buffer := t.bufferPool.get()
604			buffer.Reset()
605			buffer.Write(f.Data())
606			s.write(recvMsg{buffer: buffer})
607		}
608	}
609	if f.Header().Flags.Has(http2.FlagDataEndStream) {
610		// Received the end of stream from the client.
611		s.compareAndSwapState(streamActive, streamReadDone)
612		s.write(recvMsg{err: io.EOF})
613	}
614}
615
616func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
617	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
618	if s, ok := t.getStream(f); ok {
619		t.closeStream(s, false, 0, false)
620		return
621	}
622	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
623	t.controlBuf.put(&cleanupStream{
624		streamID: f.Header().StreamID,
625		rst:      false,
626		rstCode:  0,
627		onWrite:  func() {},
628	})
629}
630
631func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
632	if f.IsAck() {
633		return
634	}
635	var ss []http2.Setting
636	var updateFuncs []func()
637	f.ForeachSetting(func(s http2.Setting) error {
638		switch s.ID {
639		case http2.SettingMaxHeaderListSize:
640			updateFuncs = append(updateFuncs, func() {
641				t.maxSendHeaderListSize = new(uint32)
642				*t.maxSendHeaderListSize = s.Val
643			})
644		default:
645			ss = append(ss, s)
646		}
647		return nil
648	})
649	t.controlBuf.executeAndPut(func(interface{}) bool {
650		for _, f := range updateFuncs {
651			f()
652		}
653		return true
654	}, &incomingSettings{
655		ss: ss,
656	})
657}
658
659const (
660	maxPingStrikes     = 2
661	defaultPingTimeout = 2 * time.Hour
662)
663
664func (t *http2Server) handlePing(f *http2.PingFrame) {
665	if f.IsAck() {
666		if f.Data == goAwayPing.data && t.drainChan != nil {
667			close(t.drainChan)
668			return
669		}
670		// Maybe it's a BDP ping.
671		if t.bdpEst != nil {
672			t.bdpEst.calculate(f.Data)
673		}
674		return
675	}
676	pingAck := &ping{ack: true}
677	copy(pingAck.data[:], f.Data[:])
678	t.controlBuf.put(pingAck)
679
680	now := time.Now()
681	defer func() {
682		t.lastPingAt = now
683	}()
684	// A reset ping strikes means that we don't need to check for policy
685	// violation for this ping and the pingStrikes counter should be set
686	// to 0.
687	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
688		t.pingStrikes = 0
689		return
690	}
691	t.mu.Lock()
692	ns := len(t.activeStreams)
693	t.mu.Unlock()
694	if ns < 1 && !t.kep.PermitWithoutStream {
695		// Keepalive shouldn't be active thus, this new ping should
696		// have come after at least defaultPingTimeout.
697		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
698			t.pingStrikes++
699		}
700	} else {
701		// Check if keepalive policy is respected.
702		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
703			t.pingStrikes++
704		}
705	}
706
707	if t.pingStrikes > maxPingStrikes {
708		// Send goaway and close the connection.
709		errorf("transport: Got too many pings from the client, closing the connection.")
710		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
711	}
712}
713
714func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
715	t.controlBuf.put(&incomingWindowUpdate{
716		streamID:  f.Header().StreamID,
717		increment: f.Increment,
718	})
719}
720
721func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
722	for k, vv := range md {
723		if isReservedHeader(k) {
724			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
725			continue
726		}
727		for _, v := range vv {
728			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
729		}
730	}
731	return headerFields
732}
733
734func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
735	if t.maxSendHeaderListSize == nil {
736		return true
737	}
738	hdrFrame := it.(*headerFrame)
739	var sz int64
740	for _, f := range hdrFrame.hf {
741		if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
742			errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
743			return false
744		}
745	}
746	return true
747}
748
749// WriteHeader sends the header metedata md back to the client.
750func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
751	if s.updateHeaderSent() || s.getState() == streamDone {
752		return ErrIllegalHeaderWrite
753	}
754	s.hdrMu.Lock()
755	if md.Len() > 0 {
756		if s.header.Len() > 0 {
757			s.header = metadata.Join(s.header, md)
758		} else {
759			s.header = md
760		}
761	}
762	if err := t.writeHeaderLocked(s); err != nil {
763		s.hdrMu.Unlock()
764		return err
765	}
766	s.hdrMu.Unlock()
767	return nil
768}
769
770func (t *http2Server) setResetPingStrikes() {
771	atomic.StoreUint32(&t.resetPingStrikes, 1)
772}
773
774func (t *http2Server) writeHeaderLocked(s *Stream) error {
775	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
776	// first and create a slice of that exact size.
777	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
778	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
779	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
780	if s.sendCompress != "" {
781		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
782	}
783	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
784	success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
785		streamID:  s.id,
786		hf:        headerFields,
787		endStream: false,
788		onWrite:   t.setResetPingStrikes,
789	})
790	if !success {
791		if err != nil {
792			return err
793		}
794		t.closeStream(s, true, http2.ErrCodeInternal, false)
795		return ErrHeaderListSizeLimitViolation
796	}
797	if t.stats != nil {
798		// Note: WireLength is not set in outHeader.
799		// TODO(mmukhi): Revisit this later, if needed.
800		outHeader := &stats.OutHeader{}
801		t.stats.HandleRPC(s.Context(), outHeader)
802	}
803	return nil
804}
805
806// WriteStatus sends stream status to the client and terminates the stream.
807// There is no further I/O operations being able to perform on this stream.
808// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
809// OK is adopted.
810func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
811	if s.getState() == streamDone {
812		return nil
813	}
814	s.hdrMu.Lock()
815	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
816	// first and create a slice of that exact size.
817	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
818	if !s.updateHeaderSent() {                      // No headers have been sent.
819		if len(s.header) > 0 { // Send a separate header frame.
820			if err := t.writeHeaderLocked(s); err != nil {
821				s.hdrMu.Unlock()
822				return err
823			}
824		} else { // Send a trailer only response.
825			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
826			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
827		}
828	}
829	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
830	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
831
832	if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
833		stBytes, err := proto.Marshal(p)
834		if err != nil {
835			// TODO: return error instead, when callers are able to handle it.
836			grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
837		} else {
838			headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
839		}
840	}
841
842	// Attach the trailer metadata.
843	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
844	trailingHeader := &headerFrame{
845		streamID:  s.id,
846		hf:        headerFields,
847		endStream: true,
848		onWrite:   t.setResetPingStrikes,
849	}
850	s.hdrMu.Unlock()
851	success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
852	if !success {
853		if err != nil {
854			return err
855		}
856		t.closeStream(s, true, http2.ErrCodeInternal, false)
857		return ErrHeaderListSizeLimitViolation
858	}
859	// Send a RST_STREAM after the trailers if the client has not already half-closed.
860	rst := s.getState() == streamActive
861	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
862	if t.stats != nil {
863		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
864	}
865	return nil
866}
867
868// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
869// is returns if it fails (e.g., framing error, transport error).
870func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
871	if !s.isHeaderSent() { // Headers haven't been written yet.
872		if err := t.WriteHeader(s, nil); err != nil {
873			if _, ok := err.(ConnectionError); ok {
874				return err
875			}
876			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
877			return status.Errorf(codes.Internal, "transport: %v", err)
878		}
879	} else {
880		// Writing headers checks for this condition.
881		if s.getState() == streamDone {
882			// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
883			s.cancel()
884			select {
885			case <-t.ctx.Done():
886				return ErrConnClosing
887			default:
888			}
889			return ContextErr(s.ctx.Err())
890		}
891	}
892	// Add some data to header frame so that we can equally distribute bytes across frames.
893	emptyLen := http2MaxFrameLen - len(hdr)
894	if emptyLen > len(data) {
895		emptyLen = len(data)
896	}
897	hdr = append(hdr, data[:emptyLen]...)
898	data = data[emptyLen:]
899	df := &dataFrame{
900		streamID:    s.id,
901		h:           hdr,
902		d:           data,
903		onEachWrite: t.setResetPingStrikes,
904	}
905	if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
906		select {
907		case <-t.ctx.Done():
908			return ErrConnClosing
909		default:
910		}
911		return ContextErr(s.ctx.Err())
912	}
913	return t.controlBuf.put(df)
914}
915
916// keepalive running in a separate goroutine does the following:
917// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
918// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
919// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
920// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
921// after an additional duration of keepalive.Timeout.
922func (t *http2Server) keepalive() {
923	p := &ping{}
924	var pingSent bool
925	maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
926	maxAge := time.NewTimer(t.kp.MaxConnectionAge)
927	keepalive := time.NewTimer(t.kp.Time)
928	// NOTE: All exit paths of this function should reset their
929	// respective timers. A failure to do so will cause the
930	// following clean-up to deadlock and eventually leak.
931	defer func() {
932		if !maxIdle.Stop() {
933			<-maxIdle.C
934		}
935		if !maxAge.Stop() {
936			<-maxAge.C
937		}
938		if !keepalive.Stop() {
939			<-keepalive.C
940		}
941	}()
942	for {
943		select {
944		case <-maxIdle.C:
945			t.mu.Lock()
946			idle := t.idle
947			if idle.IsZero() { // The connection is non-idle.
948				t.mu.Unlock()
949				maxIdle.Reset(t.kp.MaxConnectionIdle)
950				continue
951			}
952			val := t.kp.MaxConnectionIdle - time.Since(idle)
953			t.mu.Unlock()
954			if val <= 0 {
955				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
956				// Gracefully close the connection.
957				t.drain(http2.ErrCodeNo, []byte{})
958				// Resetting the timer so that the clean-up doesn't deadlock.
959				maxIdle.Reset(infinity)
960				return
961			}
962			maxIdle.Reset(val)
963		case <-maxAge.C:
964			t.drain(http2.ErrCodeNo, []byte{})
965			maxAge.Reset(t.kp.MaxConnectionAgeGrace)
966			select {
967			case <-maxAge.C:
968				// Close the connection after grace period.
969				infof("transport: closing server transport due to maximum connection age.")
970				t.Close()
971				// Resetting the timer so that the clean-up doesn't deadlock.
972				maxAge.Reset(infinity)
973			case <-t.ctx.Done():
974			}
975			return
976		case <-keepalive.C:
977			if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
978				pingSent = false
979				keepalive.Reset(t.kp.Time)
980				continue
981			}
982			if pingSent {
983				infof("transport: closing server transport due to idleness.")
984				t.Close()
985				// Resetting the timer so that the clean-up doesn't deadlock.
986				keepalive.Reset(infinity)
987				return
988			}
989			pingSent = true
990			if channelz.IsOn() {
991				atomic.AddInt64(&t.czData.kpCount, 1)
992			}
993			t.controlBuf.put(p)
994			keepalive.Reset(t.kp.Timeout)
995		case <-t.ctx.Done():
996			return
997		}
998	}
999}
1000
1001// Close starts shutting down the http2Server transport.
1002// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1003// could cause some resource issue. Revisit this later.
1004func (t *http2Server) Close() error {
1005	t.mu.Lock()
1006	if t.state == closing {
1007		t.mu.Unlock()
1008		return errors.New("transport: Close() was already called")
1009	}
1010	t.state = closing
1011	streams := t.activeStreams
1012	t.activeStreams = nil
1013	t.mu.Unlock()
1014	t.controlBuf.finish()
1015	t.cancel()
1016	err := t.conn.Close()
1017	if channelz.IsOn() {
1018		channelz.RemoveEntry(t.channelzID)
1019	}
1020	// Cancel all active streams.
1021	for _, s := range streams {
1022		s.cancel()
1023	}
1024	if t.stats != nil {
1025		connEnd := &stats.ConnEnd{}
1026		t.stats.HandleConn(t.ctx, connEnd)
1027	}
1028	return err
1029}
1030
1031// deleteStream deletes the stream s from transport's active streams.
1032func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1033	// In case stream sending and receiving are invoked in separate
1034	// goroutines (e.g., bi-directional streaming), cancel needs to be
1035	// called to interrupt the potential blocking on other goroutines.
1036	s.cancel()
1037
1038	t.mu.Lock()
1039	if _, ok := t.activeStreams[s.id]; ok {
1040		delete(t.activeStreams, s.id)
1041		if len(t.activeStreams) == 0 {
1042			t.idle = time.Now()
1043		}
1044	}
1045	t.mu.Unlock()
1046
1047	if channelz.IsOn() {
1048		if eosReceived {
1049			atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1050		} else {
1051			atomic.AddInt64(&t.czData.streamsFailed, 1)
1052		}
1053	}
1054}
1055
1056// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1057func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1058	oldState := s.swapState(streamDone)
1059	if oldState == streamDone {
1060		// If the stream was already done, return.
1061		return
1062	}
1063
1064	hdr.cleanup = &cleanupStream{
1065		streamID: s.id,
1066		rst:      rst,
1067		rstCode:  rstCode,
1068		onWrite: func() {
1069			t.deleteStream(s, eosReceived)
1070		},
1071	}
1072	t.controlBuf.put(hdr)
1073}
1074
1075// closeStream clears the footprint of a stream when the stream is not needed any more.
1076func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1077	s.swapState(streamDone)
1078	t.deleteStream(s, eosReceived)
1079
1080	t.controlBuf.put(&cleanupStream{
1081		streamID: s.id,
1082		rst:      rst,
1083		rstCode:  rstCode,
1084		onWrite:  func() {},
1085	})
1086}
1087
1088func (t *http2Server) RemoteAddr() net.Addr {
1089	return t.remoteAddr
1090}
1091
1092func (t *http2Server) Drain() {
1093	t.drain(http2.ErrCodeNo, []byte{})
1094}
1095
1096func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1097	t.mu.Lock()
1098	defer t.mu.Unlock()
1099	if t.drainChan != nil {
1100		return
1101	}
1102	t.drainChan = make(chan struct{})
1103	t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1104}
1105
1106var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1107
1108// Handles outgoing GoAway and returns true if loopy needs to put itself
1109// in draining mode.
1110func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1111	t.mu.Lock()
1112	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1113		t.mu.Unlock()
1114		// The transport is closing.
1115		return false, ErrConnClosing
1116	}
1117	sid := t.maxStreamID
1118	if !g.headsUp {
1119		// Stop accepting more streams now.
1120		t.state = draining
1121		if len(t.activeStreams) == 0 {
1122			g.closeConn = true
1123		}
1124		t.mu.Unlock()
1125		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1126			return false, err
1127		}
1128		if g.closeConn {
1129			// Abruptly close the connection following the GoAway (via
1130			// loopywriter).  But flush out what's inside the buffer first.
1131			t.framer.writer.Flush()
1132			return false, fmt.Errorf("transport: Connection closing")
1133		}
1134		return true, nil
1135	}
1136	t.mu.Unlock()
1137	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1138	// Follow that with a ping and wait for the ack to come back or a timer
1139	// to expire. During this time accept new streams since they might have
1140	// originated before the GoAway reaches the client.
1141	// After getting the ack or timer expiration send out another GoAway this
1142	// time with an ID of the max stream server intends to process.
1143	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1144		return false, err
1145	}
1146	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1147		return false, err
1148	}
1149	go func() {
1150		timer := time.NewTimer(time.Minute)
1151		defer timer.Stop()
1152		select {
1153		case <-t.drainChan:
1154		case <-timer.C:
1155		case <-t.ctx.Done():
1156			return
1157		}
1158		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1159	}()
1160	return false, nil
1161}
1162
1163func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1164	s := channelz.SocketInternalMetric{
1165		StreamsStarted:                   atomic.LoadInt64(&t.czData.streamsStarted),
1166		StreamsSucceeded:                 atomic.LoadInt64(&t.czData.streamsSucceeded),
1167		StreamsFailed:                    atomic.LoadInt64(&t.czData.streamsFailed),
1168		MessagesSent:                     atomic.LoadInt64(&t.czData.msgSent),
1169		MessagesReceived:                 atomic.LoadInt64(&t.czData.msgRecv),
1170		KeepAlivesSent:                   atomic.LoadInt64(&t.czData.kpCount),
1171		LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1172		LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1173		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1174		LocalFlowControlWindow:           int64(t.fc.getSize()),
1175		SocketOptions:                    channelz.GetSocketOption(t.conn),
1176		LocalAddr:                        t.localAddr,
1177		RemoteAddr:                       t.remoteAddr,
1178		// RemoteName :
1179	}
1180	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1181		s.Security = au.GetSecurityValue()
1182	}
1183	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1184	return &s
1185}
1186
1187func (t *http2Server) IncrMsgSent() {
1188	atomic.AddInt64(&t.czData.msgSent, 1)
1189	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1190}
1191
1192func (t *http2Server) IncrMsgRecv() {
1193	atomic.AddInt64(&t.czData.msgRecv, 1)
1194	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1195}
1196
1197func (t *http2Server) getOutFlowWindow() int64 {
1198	resp := make(chan uint32, 1)
1199	timer := time.NewTimer(time.Second)
1200	defer timer.Stop()
1201	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1202	select {
1203	case sz := <-resp:
1204		return int64(sz)
1205	case <-t.ctxDone:
1206		return -1
1207	case <-timer.C:
1208		return -2
1209	}
1210}
1211
1212func getJitter(v time.Duration) time.Duration {
1213	if v == infinity {
1214		return 0
1215	}
1216	// Generate a jitter between +/- 10% of the value.
1217	r := int64(v / 10)
1218	j := grpcrand.Int63n(2*r) - r
1219	return time.Duration(j)
1220}
1221