1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"context"
24	"errors"
25	"fmt"
26	"io"
27	"math"
28	"net"
29	"strconv"
30	"sync"
31	"sync/atomic"
32	"time"
33
34	"github.com/golang/protobuf/proto"
35	"golang.org/x/net/http2"
36	"golang.org/x/net/http2/hpack"
37
38	spb "google.golang.org/genproto/googleapis/rpc/status"
39	"google.golang.org/grpc/codes"
40	"google.golang.org/grpc/credentials"
41	"google.golang.org/grpc/grpclog"
42	"google.golang.org/grpc/internal"
43	"google.golang.org/grpc/internal/channelz"
44	"google.golang.org/grpc/internal/grpcrand"
45	"google.golang.org/grpc/keepalive"
46	"google.golang.org/grpc/metadata"
47	"google.golang.org/grpc/peer"
48	"google.golang.org/grpc/stats"
49	"google.golang.org/grpc/status"
50	"google.golang.org/grpc/tap"
51)
52
53var (
54	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
55	// the stream's state.
56	ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
57	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
58	// than the limit set by peer.
59	ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
60	// statusRawProto is a function to get to the raw status proto wrapped in a
61	// status.Status without a proto.Clone().
62	statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
63)
64
65// http2Server implements the ServerTransport interface with HTTP2.
66type http2Server struct {
67	ctx         context.Context
68	ctxDone     <-chan struct{} // Cache the context.Done() chan
69	cancel      context.CancelFunc
70	conn        net.Conn
71	loopy       *loopyWriter
72	readerDone  chan struct{} // sync point to enable testing.
73	writerDone  chan struct{} // sync point to enable testing.
74	remoteAddr  net.Addr
75	localAddr   net.Addr
76	maxStreamID uint32               // max stream ID ever seen
77	authInfo    credentials.AuthInfo // auth info about the connection
78	inTapHandle tap.ServerInHandle
79	framer      *framer
80	// The max number of concurrent streams.
81	maxStreams uint32
82	// controlBuf delivers all the control related tasks (e.g., window
83	// updates, reset streams, and various settings) to the controller.
84	controlBuf *controlBuffer
85	fc         *trInFlow
86	stats      stats.Handler
87	// Flag to keep track of reading activity on transport.
88	// 1 is true and 0 is false.
89	activity uint32 // Accessed atomically.
90	// Keepalive and max-age parameters for the server.
91	kp keepalive.ServerParameters
92
93	// Keepalive enforcement policy.
94	kep keepalive.EnforcementPolicy
95	// The time instance last ping was received.
96	lastPingAt time.Time
97	// Number of times the client has violated keepalive ping policy so far.
98	pingStrikes uint8
99	// Flag to signify that number of ping strikes should be reset to 0.
100	// This is set whenever data or header frames are sent.
101	// 1 means yes.
102	resetPingStrikes      uint32 // Accessed atomically.
103	initialWindowSize     int32
104	bdpEst                *bdpEstimator
105	maxSendHeaderListSize *uint32
106
107	mu sync.Mutex // guard the following
108
109	// drainChan is initialized when drain(...) is called the first time.
110	// After which the server writes out the first GoAway(with ID 2^31-1) frame.
111	// Then an independent goroutine will be launched to later send the second GoAway.
112	// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
113	// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
114	// already underway.
115	drainChan     chan struct{}
116	state         transportState
117	activeStreams map[uint32]*Stream
118	// idle is the time instant when the connection went idle.
119	// This is either the beginning of the connection or when the number of
120	// RPCs go down to 0.
121	// When the connection is busy, this value is set to 0.
122	idle time.Time
123
124	// Fields below are for channelz metric collection.
125	channelzID int64 // channelz unique identification number
126	czData     *channelzData
127	bufferPool *bufferPool
128}
129
130// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
131// returned if something goes wrong.
132func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
133	writeBufSize := config.WriteBufferSize
134	readBufSize := config.ReadBufferSize
135	maxHeaderListSize := defaultServerMaxHeaderListSize
136	if config.MaxHeaderListSize != nil {
137		maxHeaderListSize = *config.MaxHeaderListSize
138	}
139	framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
140	// Send initial settings as connection preface to client.
141	var isettings []http2.Setting
142	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
143	// permitted in the HTTP2 spec.
144	maxStreams := config.MaxStreams
145	if maxStreams == 0 {
146		maxStreams = math.MaxUint32
147	} else {
148		isettings = append(isettings, http2.Setting{
149			ID:  http2.SettingMaxConcurrentStreams,
150			Val: maxStreams,
151		})
152	}
153	dynamicWindow := true
154	iwz := int32(initialWindowSize)
155	if config.InitialWindowSize >= defaultWindowSize {
156		iwz = config.InitialWindowSize
157		dynamicWindow = false
158	}
159	icwz := int32(initialWindowSize)
160	if config.InitialConnWindowSize >= defaultWindowSize {
161		icwz = config.InitialConnWindowSize
162		dynamicWindow = false
163	}
164	if iwz != defaultWindowSize {
165		isettings = append(isettings, http2.Setting{
166			ID:  http2.SettingInitialWindowSize,
167			Val: uint32(iwz)})
168	}
169	if config.MaxHeaderListSize != nil {
170		isettings = append(isettings, http2.Setting{
171			ID:  http2.SettingMaxHeaderListSize,
172			Val: *config.MaxHeaderListSize,
173		})
174	}
175	if err := framer.fr.WriteSettings(isettings...); err != nil {
176		return nil, connectionErrorf(false, err, "transport: %v", err)
177	}
178	// Adjust the connection flow control window if needed.
179	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
180		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
181			return nil, connectionErrorf(false, err, "transport: %v", err)
182		}
183	}
184	kp := config.KeepaliveParams
185	if kp.MaxConnectionIdle == 0 {
186		kp.MaxConnectionIdle = defaultMaxConnectionIdle
187	}
188	if kp.MaxConnectionAge == 0 {
189		kp.MaxConnectionAge = defaultMaxConnectionAge
190	}
191	// Add a jitter to MaxConnectionAge.
192	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
193	if kp.MaxConnectionAgeGrace == 0 {
194		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
195	}
196	if kp.Time == 0 {
197		kp.Time = defaultServerKeepaliveTime
198	}
199	if kp.Timeout == 0 {
200		kp.Timeout = defaultServerKeepaliveTimeout
201	}
202	kep := config.KeepalivePolicy
203	if kep.MinTime == 0 {
204		kep.MinTime = defaultKeepalivePolicyMinTime
205	}
206	ctx, cancel := context.WithCancel(context.Background())
207	t := &http2Server{
208		ctx:               ctx,
209		cancel:            cancel,
210		ctxDone:           ctx.Done(),
211		conn:              conn,
212		remoteAddr:        conn.RemoteAddr(),
213		localAddr:         conn.LocalAddr(),
214		authInfo:          config.AuthInfo,
215		framer:            framer,
216		readerDone:        make(chan struct{}),
217		writerDone:        make(chan struct{}),
218		maxStreams:        maxStreams,
219		inTapHandle:       config.InTapHandle,
220		fc:                &trInFlow{limit: uint32(icwz)},
221		state:             reachable,
222		activeStreams:     make(map[uint32]*Stream),
223		stats:             config.StatsHandler,
224		kp:                kp,
225		idle:              time.Now(),
226		kep:               kep,
227		initialWindowSize: iwz,
228		czData:            new(channelzData),
229		bufferPool:        newBufferPool(),
230	}
231	t.controlBuf = newControlBuffer(t.ctxDone)
232	if dynamicWindow {
233		t.bdpEst = &bdpEstimator{
234			bdp:               initialWindowSize,
235			updateFlowControl: t.updateFlowControl,
236		}
237	}
238	if t.stats != nil {
239		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
240			RemoteAddr: t.remoteAddr,
241			LocalAddr:  t.localAddr,
242		})
243		connBegin := &stats.ConnBegin{}
244		t.stats.HandleConn(t.ctx, connBegin)
245	}
246	if channelz.IsOn() {
247		t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
248	}
249	t.framer.writer.Flush()
250
251	defer func() {
252		if err != nil {
253			t.Close()
254		}
255	}()
256
257	// Check the validity of client preface.
258	preface := make([]byte, len(clientPreface))
259	if _, err := io.ReadFull(t.conn, preface); err != nil {
260		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
261	}
262	if !bytes.Equal(preface, clientPreface) {
263		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
264	}
265
266	frame, err := t.framer.fr.ReadFrame()
267	if err == io.EOF || err == io.ErrUnexpectedEOF {
268		return nil, err
269	}
270	if err != nil {
271		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
272	}
273	atomic.StoreUint32(&t.activity, 1)
274	sf, ok := frame.(*http2.SettingsFrame)
275	if !ok {
276		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
277	}
278	t.handleSettings(sf)
279
280	go func() {
281		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
282		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
283		if err := t.loopy.run(); err != nil {
284			errorf("transport: loopyWriter.run returning. Err: %v", err)
285		}
286		t.conn.Close()
287		close(t.writerDone)
288	}()
289	go t.keepalive()
290	return t, nil
291}
292
293// operateHeader takes action on the decoded headers.
294func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
295	streamID := frame.Header().StreamID
296	state := &decodeState{
297		serverSide: true,
298	}
299	if err := state.decodeHeader(frame); err != nil {
300		if se, ok := status.FromError(err); ok {
301			t.controlBuf.put(&cleanupStream{
302				streamID: streamID,
303				rst:      true,
304				rstCode:  statusCodeConvTab[se.Code()],
305				onWrite:  func() {},
306			})
307		}
308		return false
309	}
310
311	buf := newRecvBuffer()
312	s := &Stream{
313		id:             streamID,
314		st:             t,
315		buf:            buf,
316		fc:             &inFlow{limit: uint32(t.initialWindowSize)},
317		recvCompress:   state.data.encoding,
318		method:         state.data.method,
319		contentSubtype: state.data.contentSubtype,
320	}
321	if frame.StreamEnded() {
322		// s is just created by the caller. No lock needed.
323		s.state = streamReadDone
324	}
325	if state.data.timeoutSet {
326		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
327	} else {
328		s.ctx, s.cancel = context.WithCancel(t.ctx)
329	}
330	pr := &peer.Peer{
331		Addr: t.remoteAddr,
332	}
333	// Attach Auth info if there is any.
334	if t.authInfo != nil {
335		pr.AuthInfo = t.authInfo
336	}
337	s.ctx = peer.NewContext(s.ctx, pr)
338	// Attach the received metadata to the context.
339	if len(state.data.mdata) > 0 {
340		s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
341	}
342	if state.data.statsTags != nil {
343		s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
344	}
345	if state.data.statsTrace != nil {
346		s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
347	}
348	if t.inTapHandle != nil {
349		var err error
350		info := &tap.Info{
351			FullMethodName: state.data.method,
352		}
353		s.ctx, err = t.inTapHandle(s.ctx, info)
354		if err != nil {
355			warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
356			t.controlBuf.put(&cleanupStream{
357				streamID: s.id,
358				rst:      true,
359				rstCode:  http2.ErrCodeRefusedStream,
360				onWrite:  func() {},
361			})
362			return false
363		}
364	}
365	t.mu.Lock()
366	if t.state != reachable {
367		t.mu.Unlock()
368		return false
369	}
370	if uint32(len(t.activeStreams)) >= t.maxStreams {
371		t.mu.Unlock()
372		t.controlBuf.put(&cleanupStream{
373			streamID: streamID,
374			rst:      true,
375			rstCode:  http2.ErrCodeRefusedStream,
376			onWrite:  func() {},
377		})
378		return false
379	}
380	if streamID%2 != 1 || streamID <= t.maxStreamID {
381		t.mu.Unlock()
382		// illegal gRPC stream id.
383		errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
384		return true
385	}
386	t.maxStreamID = streamID
387	t.activeStreams[streamID] = s
388	if len(t.activeStreams) == 1 {
389		t.idle = time.Time{}
390	}
391	t.mu.Unlock()
392	if channelz.IsOn() {
393		atomic.AddInt64(&t.czData.streamsStarted, 1)
394		atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
395	}
396	s.requestRead = func(n int) {
397		t.adjustWindow(s, uint32(n))
398	}
399	s.ctx = traceCtx(s.ctx, s.method)
400	if t.stats != nil {
401		s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
402		inHeader := &stats.InHeader{
403			FullMethod:  s.method,
404			RemoteAddr:  t.remoteAddr,
405			LocalAddr:   t.localAddr,
406			Compression: s.recvCompress,
407			WireLength:  int(frame.Header().Length),
408		}
409		t.stats.HandleRPC(s.ctx, inHeader)
410	}
411	s.ctxDone = s.ctx.Done()
412	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
413	s.trReader = &transportReader{
414		reader: &recvBufferReader{
415			ctx:        s.ctx,
416			ctxDone:    s.ctxDone,
417			recv:       s.buf,
418			freeBuffer: t.bufferPool.put,
419		},
420		windowHandler: func(n int) {
421			t.updateWindow(s, uint32(n))
422		},
423	}
424	// Register the stream with loopy.
425	t.controlBuf.put(&registerStream{
426		streamID: s.id,
427		wq:       s.wq,
428	})
429	handle(s)
430	return false
431}
432
433// HandleStreams receives incoming streams using the given handler. This is
434// typically run in a separate goroutine.
435// traceCtx attaches trace to ctx and returns the new context.
436func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
437	defer close(t.readerDone)
438	for {
439		frame, err := t.framer.fr.ReadFrame()
440		atomic.StoreUint32(&t.activity, 1)
441		if err != nil {
442			if se, ok := err.(http2.StreamError); ok {
443				warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
444				t.mu.Lock()
445				s := t.activeStreams[se.StreamID]
446				t.mu.Unlock()
447				if s != nil {
448					t.closeStream(s, true, se.Code, false)
449				} else {
450					t.controlBuf.put(&cleanupStream{
451						streamID: se.StreamID,
452						rst:      true,
453						rstCode:  se.Code,
454						onWrite:  func() {},
455					})
456				}
457				continue
458			}
459			if err == io.EOF || err == io.ErrUnexpectedEOF {
460				t.Close()
461				return
462			}
463			warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
464			t.Close()
465			return
466		}
467		switch frame := frame.(type) {
468		case *http2.MetaHeadersFrame:
469			if t.operateHeaders(frame, handle, traceCtx) {
470				t.Close()
471				break
472			}
473		case *http2.DataFrame:
474			t.handleData(frame)
475		case *http2.RSTStreamFrame:
476			t.handleRSTStream(frame)
477		case *http2.SettingsFrame:
478			t.handleSettings(frame)
479		case *http2.PingFrame:
480			t.handlePing(frame)
481		case *http2.WindowUpdateFrame:
482			t.handleWindowUpdate(frame)
483		case *http2.GoAwayFrame:
484			// TODO: Handle GoAway from the client appropriately.
485		default:
486			errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
487		}
488	}
489}
490
491func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
492	t.mu.Lock()
493	defer t.mu.Unlock()
494	if t.activeStreams == nil {
495		// The transport is closing.
496		return nil, false
497	}
498	s, ok := t.activeStreams[f.Header().StreamID]
499	if !ok {
500		// The stream is already done.
501		return nil, false
502	}
503	return s, true
504}
505
506// adjustWindow sends out extra window update over the initial window size
507// of stream if the application is requesting data larger in size than
508// the window.
509func (t *http2Server) adjustWindow(s *Stream, n uint32) {
510	if w := s.fc.maybeAdjust(n); w > 0 {
511		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
512	}
513
514}
515
516// updateWindow adjusts the inbound quota for the stream and the transport.
517// Window updates will deliver to the controller for sending when
518// the cumulative quota exceeds the corresponding threshold.
519func (t *http2Server) updateWindow(s *Stream, n uint32) {
520	if w := s.fc.onRead(n); w > 0 {
521		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
522			increment: w,
523		})
524	}
525}
526
527// updateFlowControl updates the incoming flow control windows
528// for the transport and the stream based on the current bdp
529// estimation.
530func (t *http2Server) updateFlowControl(n uint32) {
531	t.mu.Lock()
532	for _, s := range t.activeStreams {
533		s.fc.newLimit(n)
534	}
535	t.initialWindowSize = int32(n)
536	t.mu.Unlock()
537	t.controlBuf.put(&outgoingWindowUpdate{
538		streamID:  0,
539		increment: t.fc.newLimit(n),
540	})
541	t.controlBuf.put(&outgoingSettings{
542		ss: []http2.Setting{
543			{
544				ID:  http2.SettingInitialWindowSize,
545				Val: n,
546			},
547		},
548	})
549
550}
551
552func (t *http2Server) handleData(f *http2.DataFrame) {
553	size := f.Header().Length
554	var sendBDPPing bool
555	if t.bdpEst != nil {
556		sendBDPPing = t.bdpEst.add(size)
557	}
558	// Decouple connection's flow control from application's read.
559	// An update on connection's flow control should not depend on
560	// whether user application has read the data or not. Such a
561	// restriction is already imposed on the stream's flow control,
562	// and therefore the sender will be blocked anyways.
563	// Decoupling the connection flow control will prevent other
564	// active(fast) streams from starving in presence of slow or
565	// inactive streams.
566	if w := t.fc.onData(size); w > 0 {
567		t.controlBuf.put(&outgoingWindowUpdate{
568			streamID:  0,
569			increment: w,
570		})
571	}
572	if sendBDPPing {
573		// Avoid excessive ping detection (e.g. in an L7 proxy)
574		// by sending a window update prior to the BDP ping.
575		if w := t.fc.reset(); w > 0 {
576			t.controlBuf.put(&outgoingWindowUpdate{
577				streamID:  0,
578				increment: w,
579			})
580		}
581		t.controlBuf.put(bdpPing)
582	}
583	// Select the right stream to dispatch.
584	s, ok := t.getStream(f)
585	if !ok {
586		return
587	}
588	if size > 0 {
589		if err := s.fc.onData(size); err != nil {
590			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
591			return
592		}
593		if f.Header().Flags.Has(http2.FlagDataPadded) {
594			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
595				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
596			}
597		}
598		// TODO(bradfitz, zhaoq): A copy is required here because there is no
599		// guarantee f.Data() is consumed before the arrival of next frame.
600		// Can this copy be eliminated?
601		if len(f.Data()) > 0 {
602			buffer := t.bufferPool.get()
603			buffer.Reset()
604			buffer.Write(f.Data())
605			s.write(recvMsg{buffer: buffer})
606		}
607	}
608	if f.Header().Flags.Has(http2.FlagDataEndStream) {
609		// Received the end of stream from the client.
610		s.compareAndSwapState(streamActive, streamReadDone)
611		s.write(recvMsg{err: io.EOF})
612	}
613}
614
615func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
616	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
617	if s, ok := t.getStream(f); ok {
618		t.closeStream(s, false, 0, false)
619		return
620	}
621	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
622	t.controlBuf.put(&cleanupStream{
623		streamID: f.Header().StreamID,
624		rst:      false,
625		rstCode:  0,
626		onWrite:  func() {},
627	})
628}
629
630func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
631	if f.IsAck() {
632		return
633	}
634	var ss []http2.Setting
635	var updateFuncs []func()
636	f.ForeachSetting(func(s http2.Setting) error {
637		switch s.ID {
638		case http2.SettingMaxHeaderListSize:
639			updateFuncs = append(updateFuncs, func() {
640				t.maxSendHeaderListSize = new(uint32)
641				*t.maxSendHeaderListSize = s.Val
642			})
643		default:
644			ss = append(ss, s)
645		}
646		return nil
647	})
648	t.controlBuf.executeAndPut(func(interface{}) bool {
649		for _, f := range updateFuncs {
650			f()
651		}
652		return true
653	}, &incomingSettings{
654		ss: ss,
655	})
656}
657
658const (
659	maxPingStrikes     = 2
660	defaultPingTimeout = 2 * time.Hour
661)
662
663func (t *http2Server) handlePing(f *http2.PingFrame) {
664	if f.IsAck() {
665		if f.Data == goAwayPing.data && t.drainChan != nil {
666			close(t.drainChan)
667			return
668		}
669		// Maybe it's a BDP ping.
670		if t.bdpEst != nil {
671			t.bdpEst.calculate(f.Data)
672		}
673		return
674	}
675	pingAck := &ping{ack: true}
676	copy(pingAck.data[:], f.Data[:])
677	t.controlBuf.put(pingAck)
678
679	now := time.Now()
680	defer func() {
681		t.lastPingAt = now
682	}()
683	// A reset ping strikes means that we don't need to check for policy
684	// violation for this ping and the pingStrikes counter should be set
685	// to 0.
686	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
687		t.pingStrikes = 0
688		return
689	}
690	t.mu.Lock()
691	ns := len(t.activeStreams)
692	t.mu.Unlock()
693	if ns < 1 && !t.kep.PermitWithoutStream {
694		// Keepalive shouldn't be active thus, this new ping should
695		// have come after at least defaultPingTimeout.
696		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
697			t.pingStrikes++
698		}
699	} else {
700		// Check if keepalive policy is respected.
701		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
702			t.pingStrikes++
703		}
704	}
705
706	if t.pingStrikes > maxPingStrikes {
707		// Send goaway and close the connection.
708		errorf("transport: Got too many pings from the client, closing the connection.")
709		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
710	}
711}
712
713func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
714	t.controlBuf.put(&incomingWindowUpdate{
715		streamID:  f.Header().StreamID,
716		increment: f.Increment,
717	})
718}
719
720func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
721	for k, vv := range md {
722		if isReservedHeader(k) {
723			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
724			continue
725		}
726		for _, v := range vv {
727			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
728		}
729	}
730	return headerFields
731}
732
733func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
734	if t.maxSendHeaderListSize == nil {
735		return true
736	}
737	hdrFrame := it.(*headerFrame)
738	var sz int64
739	for _, f := range hdrFrame.hf {
740		if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
741			errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
742			return false
743		}
744	}
745	return true
746}
747
748// WriteHeader sends the header metedata md back to the client.
749func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
750	if s.updateHeaderSent() || s.getState() == streamDone {
751		return ErrIllegalHeaderWrite
752	}
753	s.hdrMu.Lock()
754	if md.Len() > 0 {
755		if s.header.Len() > 0 {
756			s.header = metadata.Join(s.header, md)
757		} else {
758			s.header = md
759		}
760	}
761	if err := t.writeHeaderLocked(s); err != nil {
762		s.hdrMu.Unlock()
763		return err
764	}
765	s.hdrMu.Unlock()
766	return nil
767}
768
769func (t *http2Server) writeHeaderLocked(s *Stream) error {
770	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
771	// first and create a slice of that exact size.
772	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
773	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
774	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
775	if s.sendCompress != "" {
776		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
777	}
778	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
779	success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
780		streamID:  s.id,
781		hf:        headerFields,
782		endStream: false,
783		onWrite: func() {
784			atomic.StoreUint32(&t.resetPingStrikes, 1)
785		},
786	})
787	if !success {
788		if err != nil {
789			return err
790		}
791		t.closeStream(s, true, http2.ErrCodeInternal, false)
792		return ErrHeaderListSizeLimitViolation
793	}
794	if t.stats != nil {
795		// Note: WireLength is not set in outHeader.
796		// TODO(mmukhi): Revisit this later, if needed.
797		outHeader := &stats.OutHeader{}
798		t.stats.HandleRPC(s.Context(), outHeader)
799	}
800	return nil
801}
802
803// WriteStatus sends stream status to the client and terminates the stream.
804// There is no further I/O operations being able to perform on this stream.
805// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
806// OK is adopted.
807func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
808	if s.getState() == streamDone {
809		return nil
810	}
811	s.hdrMu.Lock()
812	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
813	// first and create a slice of that exact size.
814	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
815	if !s.updateHeaderSent() {                      // No headers have been sent.
816		if len(s.header) > 0 { // Send a separate header frame.
817			if err := t.writeHeaderLocked(s); err != nil {
818				s.hdrMu.Unlock()
819				return err
820			}
821		} else { // Send a trailer only response.
822			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
823			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
824		}
825	}
826	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
827	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
828
829	if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
830		stBytes, err := proto.Marshal(p)
831		if err != nil {
832			// TODO: return error instead, when callers are able to handle it.
833			grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
834		} else {
835			headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
836		}
837	}
838
839	// Attach the trailer metadata.
840	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
841	trailingHeader := &headerFrame{
842		streamID:  s.id,
843		hf:        headerFields,
844		endStream: true,
845		onWrite: func() {
846			atomic.StoreUint32(&t.resetPingStrikes, 1)
847		},
848	}
849	s.hdrMu.Unlock()
850	success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
851	if !success {
852		if err != nil {
853			return err
854		}
855		t.closeStream(s, true, http2.ErrCodeInternal, false)
856		return ErrHeaderListSizeLimitViolation
857	}
858	// Send a RST_STREAM after the trailers if the client has not already half-closed.
859	rst := s.getState() == streamActive
860	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
861	if t.stats != nil {
862		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
863	}
864	return nil
865}
866
867// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
868// is returns if it fails (e.g., framing error, transport error).
869func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
870	if !s.isHeaderSent() { // Headers haven't been written yet.
871		if err := t.WriteHeader(s, nil); err != nil {
872			if _, ok := err.(ConnectionError); ok {
873				return err
874			}
875			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
876			return status.Errorf(codes.Internal, "transport: %v", err)
877		}
878	} else {
879		// Writing headers checks for this condition.
880		if s.getState() == streamDone {
881			// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
882			s.cancel()
883			select {
884			case <-t.ctx.Done():
885				return ErrConnClosing
886			default:
887			}
888			return ContextErr(s.ctx.Err())
889		}
890	}
891	// Add some data to header frame so that we can equally distribute bytes across frames.
892	emptyLen := http2MaxFrameLen - len(hdr)
893	if emptyLen > len(data) {
894		emptyLen = len(data)
895	}
896	hdr = append(hdr, data[:emptyLen]...)
897	data = data[emptyLen:]
898	df := &dataFrame{
899		streamID: s.id,
900		h:        hdr,
901		d:        data,
902		onEachWrite: func() {
903			atomic.StoreUint32(&t.resetPingStrikes, 1)
904		},
905	}
906	if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
907		select {
908		case <-t.ctx.Done():
909			return ErrConnClosing
910		default:
911		}
912		return ContextErr(s.ctx.Err())
913	}
914	return t.controlBuf.put(df)
915}
916
917// keepalive running in a separate goroutine does the following:
918// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
919// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
920// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
921// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
922// after an additional duration of keepalive.Timeout.
923func (t *http2Server) keepalive() {
924	p := &ping{}
925	var pingSent bool
926	maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
927	maxAge := time.NewTimer(t.kp.MaxConnectionAge)
928	keepalive := time.NewTimer(t.kp.Time)
929	// NOTE: All exit paths of this function should reset their
930	// respective timers. A failure to do so will cause the
931	// following clean-up to deadlock and eventually leak.
932	defer func() {
933		if !maxIdle.Stop() {
934			<-maxIdle.C
935		}
936		if !maxAge.Stop() {
937			<-maxAge.C
938		}
939		if !keepalive.Stop() {
940			<-keepalive.C
941		}
942	}()
943	for {
944		select {
945		case <-maxIdle.C:
946			t.mu.Lock()
947			idle := t.idle
948			if idle.IsZero() { // The connection is non-idle.
949				t.mu.Unlock()
950				maxIdle.Reset(t.kp.MaxConnectionIdle)
951				continue
952			}
953			val := t.kp.MaxConnectionIdle - time.Since(idle)
954			t.mu.Unlock()
955			if val <= 0 {
956				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
957				// Gracefully close the connection.
958				t.drain(http2.ErrCodeNo, []byte{})
959				// Resetting the timer so that the clean-up doesn't deadlock.
960				maxIdle.Reset(infinity)
961				return
962			}
963			maxIdle.Reset(val)
964		case <-maxAge.C:
965			t.drain(http2.ErrCodeNo, []byte{})
966			maxAge.Reset(t.kp.MaxConnectionAgeGrace)
967			select {
968			case <-maxAge.C:
969				// Close the connection after grace period.
970				t.Close()
971				// Resetting the timer so that the clean-up doesn't deadlock.
972				maxAge.Reset(infinity)
973			case <-t.ctx.Done():
974			}
975			return
976		case <-keepalive.C:
977			if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
978				pingSent = false
979				keepalive.Reset(t.kp.Time)
980				continue
981			}
982			if pingSent {
983				t.Close()
984				// Resetting the timer so that the clean-up doesn't deadlock.
985				keepalive.Reset(infinity)
986				return
987			}
988			pingSent = true
989			if channelz.IsOn() {
990				atomic.AddInt64(&t.czData.kpCount, 1)
991			}
992			t.controlBuf.put(p)
993			keepalive.Reset(t.kp.Timeout)
994		case <-t.ctx.Done():
995			return
996		}
997	}
998}
999
1000// Close starts shutting down the http2Server transport.
1001// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1002// could cause some resource issue. Revisit this later.
1003func (t *http2Server) Close() error {
1004	t.mu.Lock()
1005	if t.state == closing {
1006		t.mu.Unlock()
1007		return errors.New("transport: Close() was already called")
1008	}
1009	t.state = closing
1010	streams := t.activeStreams
1011	t.activeStreams = nil
1012	t.mu.Unlock()
1013	t.controlBuf.finish()
1014	t.cancel()
1015	err := t.conn.Close()
1016	if channelz.IsOn() {
1017		channelz.RemoveEntry(t.channelzID)
1018	}
1019	// Cancel all active streams.
1020	for _, s := range streams {
1021		s.cancel()
1022	}
1023	if t.stats != nil {
1024		connEnd := &stats.ConnEnd{}
1025		t.stats.HandleConn(t.ctx, connEnd)
1026	}
1027	return err
1028}
1029
1030// deleteStream deletes the stream s from transport's active streams.
1031func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1032	// In case stream sending and receiving are invoked in separate
1033	// goroutines (e.g., bi-directional streaming), cancel needs to be
1034	// called to interrupt the potential blocking on other goroutines.
1035	s.cancel()
1036
1037	t.mu.Lock()
1038	if _, ok := t.activeStreams[s.id]; ok {
1039		delete(t.activeStreams, s.id)
1040		if len(t.activeStreams) == 0 {
1041			t.idle = time.Now()
1042		}
1043	}
1044	t.mu.Unlock()
1045
1046	if channelz.IsOn() {
1047		if eosReceived {
1048			atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1049		} else {
1050			atomic.AddInt64(&t.czData.streamsFailed, 1)
1051		}
1052	}
1053}
1054
1055// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1056func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1057	oldState := s.swapState(streamDone)
1058	if oldState == streamDone {
1059		// If the stream was already done, return.
1060		return
1061	}
1062
1063	hdr.cleanup = &cleanupStream{
1064		streamID: s.id,
1065		rst:      rst,
1066		rstCode:  rstCode,
1067		onWrite: func() {
1068			t.deleteStream(s, eosReceived)
1069		},
1070	}
1071	t.controlBuf.put(hdr)
1072}
1073
1074// closeStream clears the footprint of a stream when the stream is not needed any more.
1075func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1076	s.swapState(streamDone)
1077	t.deleteStream(s, eosReceived)
1078
1079	t.controlBuf.put(&cleanupStream{
1080		streamID: s.id,
1081		rst:      rst,
1082		rstCode:  rstCode,
1083		onWrite:  func() {},
1084	})
1085}
1086
1087func (t *http2Server) RemoteAddr() net.Addr {
1088	return t.remoteAddr
1089}
1090
1091func (t *http2Server) Drain() {
1092	t.drain(http2.ErrCodeNo, []byte{})
1093}
1094
1095func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1096	t.mu.Lock()
1097	defer t.mu.Unlock()
1098	if t.drainChan != nil {
1099		return
1100	}
1101	t.drainChan = make(chan struct{})
1102	t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1103}
1104
1105var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1106
1107// Handles outgoing GoAway and returns true if loopy needs to put itself
1108// in draining mode.
1109func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1110	t.mu.Lock()
1111	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1112		t.mu.Unlock()
1113		// The transport is closing.
1114		return false, ErrConnClosing
1115	}
1116	sid := t.maxStreamID
1117	if !g.headsUp {
1118		// Stop accepting more streams now.
1119		t.state = draining
1120		if len(t.activeStreams) == 0 {
1121			g.closeConn = true
1122		}
1123		t.mu.Unlock()
1124		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1125			return false, err
1126		}
1127		if g.closeConn {
1128			// Abruptly close the connection following the GoAway (via
1129			// loopywriter).  But flush out what's inside the buffer first.
1130			t.framer.writer.Flush()
1131			return false, fmt.Errorf("transport: Connection closing")
1132		}
1133		return true, nil
1134	}
1135	t.mu.Unlock()
1136	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1137	// Follow that with a ping and wait for the ack to come back or a timer
1138	// to expire. During this time accept new streams since they might have
1139	// originated before the GoAway reaches the client.
1140	// After getting the ack or timer expiration send out another GoAway this
1141	// time with an ID of the max stream server intends to process.
1142	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1143		return false, err
1144	}
1145	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1146		return false, err
1147	}
1148	go func() {
1149		timer := time.NewTimer(time.Minute)
1150		defer timer.Stop()
1151		select {
1152		case <-t.drainChan:
1153		case <-timer.C:
1154		case <-t.ctx.Done():
1155			return
1156		}
1157		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1158	}()
1159	return false, nil
1160}
1161
1162func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1163	s := channelz.SocketInternalMetric{
1164		StreamsStarted:                   atomic.LoadInt64(&t.czData.streamsStarted),
1165		StreamsSucceeded:                 atomic.LoadInt64(&t.czData.streamsSucceeded),
1166		StreamsFailed:                    atomic.LoadInt64(&t.czData.streamsFailed),
1167		MessagesSent:                     atomic.LoadInt64(&t.czData.msgSent),
1168		MessagesReceived:                 atomic.LoadInt64(&t.czData.msgRecv),
1169		KeepAlivesSent:                   atomic.LoadInt64(&t.czData.kpCount),
1170		LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1171		LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1172		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1173		LocalFlowControlWindow:           int64(t.fc.getSize()),
1174		SocketOptions:                    channelz.GetSocketOption(t.conn),
1175		LocalAddr:                        t.localAddr,
1176		RemoteAddr:                       t.remoteAddr,
1177		// RemoteName :
1178	}
1179	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1180		s.Security = au.GetSecurityValue()
1181	}
1182	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1183	return &s
1184}
1185
1186func (t *http2Server) IncrMsgSent() {
1187	atomic.AddInt64(&t.czData.msgSent, 1)
1188	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1189}
1190
1191func (t *http2Server) IncrMsgRecv() {
1192	atomic.AddInt64(&t.czData.msgRecv, 1)
1193	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1194}
1195
1196func (t *http2Server) getOutFlowWindow() int64 {
1197	resp := make(chan uint32, 1)
1198	timer := time.NewTimer(time.Second)
1199	defer timer.Stop()
1200	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1201	select {
1202	case sz := <-resp:
1203		return int64(sz)
1204	case <-t.ctxDone:
1205		return -1
1206	case <-timer.C:
1207		return -2
1208	}
1209}
1210
1211func getJitter(v time.Duration) time.Duration {
1212	if v == infinity {
1213		return 0
1214	}
1215	// Generate a jitter between +/- 10% of the value.
1216	r := int64(v / 10)
1217	j := grpcrand.Int63n(2*r) - r
1218	return time.Duration(j)
1219}
1220