1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"context"
24	"errors"
25	"fmt"
26	"io"
27	"math"
28	"net"
29	"strconv"
30	"sync"
31	"sync/atomic"
32	"time"
33
34	"github.com/golang/protobuf/proto"
35	"golang.org/x/net/http2"
36	"golang.org/x/net/http2/hpack"
37
38	spb "google.golang.org/genproto/googleapis/rpc/status"
39	"google.golang.org/grpc/codes"
40	"google.golang.org/grpc/credentials"
41	"google.golang.org/grpc/grpclog"
42	"google.golang.org/grpc/internal"
43	"google.golang.org/grpc/internal/channelz"
44	"google.golang.org/grpc/internal/grpcrand"
45	"google.golang.org/grpc/keepalive"
46	"google.golang.org/grpc/metadata"
47	"google.golang.org/grpc/peer"
48	"google.golang.org/grpc/stats"
49	"google.golang.org/grpc/status"
50	"google.golang.org/grpc/tap"
51)
52
53var (
54	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
55	// the stream's state.
56	ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
57	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
58	// than the limit set by peer.
59	ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
60	// statusRawProto is a function to get to the raw status proto wrapped in a
61	// status.Status without a proto.Clone().
62	statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
63)
64
65// http2Server implements the ServerTransport interface with HTTP2.
66type http2Server struct {
67	ctx         context.Context
68	done        chan struct{}
69	conn        net.Conn
70	loopy       *loopyWriter
71	readerDone  chan struct{} // sync point to enable testing.
72	writerDone  chan struct{} // sync point to enable testing.
73	remoteAddr  net.Addr
74	localAddr   net.Addr
75	maxStreamID uint32               // max stream ID ever seen
76	authInfo    credentials.AuthInfo // auth info about the connection
77	inTapHandle tap.ServerInHandle
78	framer      *framer
79	// The max number of concurrent streams.
80	maxStreams uint32
81	// controlBuf delivers all the control related tasks (e.g., window
82	// updates, reset streams, and various settings) to the controller.
83	controlBuf *controlBuffer
84	fc         *trInFlow
85	stats      stats.Handler
86	// Flag to keep track of reading activity on transport.
87	// 1 is true and 0 is false.
88	activity uint32 // Accessed atomically.
89	// Keepalive and max-age parameters for the server.
90	kp keepalive.ServerParameters
91
92	// Keepalive enforcement policy.
93	kep keepalive.EnforcementPolicy
94	// The time instance last ping was received.
95	lastPingAt time.Time
96	// Number of times the client has violated keepalive ping policy so far.
97	pingStrikes uint8
98	// Flag to signify that number of ping strikes should be reset to 0.
99	// This is set whenever data or header frames are sent.
100	// 1 means yes.
101	resetPingStrikes      uint32 // Accessed atomically.
102	initialWindowSize     int32
103	bdpEst                *bdpEstimator
104	maxSendHeaderListSize *uint32
105
106	mu sync.Mutex // guard the following
107
108	// drainChan is initialized when drain(...) is called the first time.
109	// After which the server writes out the first GoAway(with ID 2^31-1) frame.
110	// Then an independent goroutine will be launched to later send the second GoAway.
111	// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
112	// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
113	// already underway.
114	drainChan     chan struct{}
115	state         transportState
116	activeStreams map[uint32]*Stream
117	// idle is the time instant when the connection went idle.
118	// This is either the beginning of the connection or when the number of
119	// RPCs go down to 0.
120	// When the connection is busy, this value is set to 0.
121	idle time.Time
122
123	// Fields below are for channelz metric collection.
124	channelzID int64 // channelz unique identification number
125	czData     *channelzData
126	bufferPool *bufferPool
127}
128
129// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
130// returned if something goes wrong.
131func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
132	writeBufSize := config.WriteBufferSize
133	readBufSize := config.ReadBufferSize
134	maxHeaderListSize := defaultServerMaxHeaderListSize
135	if config.MaxHeaderListSize != nil {
136		maxHeaderListSize = *config.MaxHeaderListSize
137	}
138	framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
139	// Send initial settings as connection preface to client.
140	isettings := []http2.Setting{{
141		ID:  http2.SettingMaxFrameSize,
142		Val: http2MaxFrameLen,
143	}}
144	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
145	// permitted in the HTTP2 spec.
146	maxStreams := config.MaxStreams
147	if maxStreams == 0 {
148		maxStreams = math.MaxUint32
149	} else {
150		isettings = append(isettings, http2.Setting{
151			ID:  http2.SettingMaxConcurrentStreams,
152			Val: maxStreams,
153		})
154	}
155	dynamicWindow := true
156	iwz := int32(initialWindowSize)
157	if config.InitialWindowSize >= defaultWindowSize {
158		iwz = config.InitialWindowSize
159		dynamicWindow = false
160	}
161	icwz := int32(initialWindowSize)
162	if config.InitialConnWindowSize >= defaultWindowSize {
163		icwz = config.InitialConnWindowSize
164		dynamicWindow = false
165	}
166	if iwz != defaultWindowSize {
167		isettings = append(isettings, http2.Setting{
168			ID:  http2.SettingInitialWindowSize,
169			Val: uint32(iwz)})
170	}
171	if config.MaxHeaderListSize != nil {
172		isettings = append(isettings, http2.Setting{
173			ID:  http2.SettingMaxHeaderListSize,
174			Val: *config.MaxHeaderListSize,
175		})
176	}
177	if config.HeaderTableSize != nil {
178		isettings = append(isettings, http2.Setting{
179			ID:  http2.SettingHeaderTableSize,
180			Val: *config.HeaderTableSize,
181		})
182	}
183	if err := framer.fr.WriteSettings(isettings...); err != nil {
184		return nil, connectionErrorf(false, err, "transport: %v", err)
185	}
186	// Adjust the connection flow control window if needed.
187	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
188		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
189			return nil, connectionErrorf(false, err, "transport: %v", err)
190		}
191	}
192	kp := config.KeepaliveParams
193	if kp.MaxConnectionIdle == 0 {
194		kp.MaxConnectionIdle = defaultMaxConnectionIdle
195	}
196	if kp.MaxConnectionAge == 0 {
197		kp.MaxConnectionAge = defaultMaxConnectionAge
198	}
199	// Add a jitter to MaxConnectionAge.
200	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
201	if kp.MaxConnectionAgeGrace == 0 {
202		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
203	}
204	if kp.Time == 0 {
205		kp.Time = defaultServerKeepaliveTime
206	}
207	if kp.Timeout == 0 {
208		kp.Timeout = defaultServerKeepaliveTimeout
209	}
210	kep := config.KeepalivePolicy
211	if kep.MinTime == 0 {
212		kep.MinTime = defaultKeepalivePolicyMinTime
213	}
214	done := make(chan struct{})
215	t := &http2Server{
216		ctx:               context.Background(),
217		done:              done,
218		conn:              conn,
219		remoteAddr:        conn.RemoteAddr(),
220		localAddr:         conn.LocalAddr(),
221		authInfo:          config.AuthInfo,
222		framer:            framer,
223		readerDone:        make(chan struct{}),
224		writerDone:        make(chan struct{}),
225		maxStreams:        maxStreams,
226		inTapHandle:       config.InTapHandle,
227		fc:                &trInFlow{limit: uint32(icwz)},
228		state:             reachable,
229		activeStreams:     make(map[uint32]*Stream),
230		stats:             config.StatsHandler,
231		kp:                kp,
232		idle:              time.Now(),
233		kep:               kep,
234		initialWindowSize: iwz,
235		czData:            new(channelzData),
236		bufferPool:        newBufferPool(),
237	}
238	t.controlBuf = newControlBuffer(t.done)
239	if dynamicWindow {
240		t.bdpEst = &bdpEstimator{
241			bdp:               initialWindowSize,
242			updateFlowControl: t.updateFlowControl,
243		}
244	}
245	if t.stats != nil {
246		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
247			RemoteAddr: t.remoteAddr,
248			LocalAddr:  t.localAddr,
249		})
250		connBegin := &stats.ConnBegin{}
251		t.stats.HandleConn(t.ctx, connBegin)
252	}
253	if channelz.IsOn() {
254		t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
255	}
256	t.framer.writer.Flush()
257
258	defer func() {
259		if err != nil {
260			t.Close()
261		}
262	}()
263
264	// Check the validity of client preface.
265	preface := make([]byte, len(clientPreface))
266	if _, err := io.ReadFull(t.conn, preface); err != nil {
267		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
268	}
269	if !bytes.Equal(preface, clientPreface) {
270		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
271	}
272
273	frame, err := t.framer.fr.ReadFrame()
274	if err == io.EOF || err == io.ErrUnexpectedEOF {
275		return nil, err
276	}
277	if err != nil {
278		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
279	}
280	atomic.StoreUint32(&t.activity, 1)
281	sf, ok := frame.(*http2.SettingsFrame)
282	if !ok {
283		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
284	}
285	t.handleSettings(sf)
286
287	go func() {
288		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
289		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
290		if err := t.loopy.run(); err != nil {
291			errorf("transport: loopyWriter.run returning. Err: %v", err)
292		}
293		t.conn.Close()
294		close(t.writerDone)
295	}()
296	go t.keepalive()
297	return t, nil
298}
299
300// operateHeader takes action on the decoded headers.
301func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
302	streamID := frame.Header().StreamID
303	state := &decodeState{
304		serverSide: true,
305	}
306	if err := state.decodeHeader(frame); err != nil {
307		if se, ok := status.FromError(err); ok {
308			t.controlBuf.put(&cleanupStream{
309				streamID: streamID,
310				rst:      true,
311				rstCode:  statusCodeConvTab[se.Code()],
312				onWrite:  func() {},
313			})
314		}
315		return false
316	}
317
318	buf := newRecvBuffer()
319	s := &Stream{
320		id:             streamID,
321		st:             t,
322		buf:            buf,
323		fc:             &inFlow{limit: uint32(t.initialWindowSize)},
324		recvCompress:   state.data.encoding,
325		method:         state.data.method,
326		contentSubtype: state.data.contentSubtype,
327	}
328	if frame.StreamEnded() {
329		// s is just created by the caller. No lock needed.
330		s.state = streamReadDone
331	}
332	if state.data.timeoutSet {
333		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
334	} else {
335		s.ctx, s.cancel = context.WithCancel(t.ctx)
336	}
337	pr := &peer.Peer{
338		Addr: t.remoteAddr,
339	}
340	// Attach Auth info if there is any.
341	if t.authInfo != nil {
342		pr.AuthInfo = t.authInfo
343	}
344	s.ctx = peer.NewContext(s.ctx, pr)
345	// Attach the received metadata to the context.
346	if len(state.data.mdata) > 0 {
347		s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
348	}
349	if state.data.statsTags != nil {
350		s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
351	}
352	if state.data.statsTrace != nil {
353		s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
354	}
355	if t.inTapHandle != nil {
356		var err error
357		info := &tap.Info{
358			FullMethodName: state.data.method,
359		}
360		s.ctx, err = t.inTapHandle(s.ctx, info)
361		if err != nil {
362			warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
363			t.controlBuf.put(&cleanupStream{
364				streamID: s.id,
365				rst:      true,
366				rstCode:  http2.ErrCodeRefusedStream,
367				onWrite:  func() {},
368			})
369			s.cancel()
370			return false
371		}
372	}
373	t.mu.Lock()
374	if t.state != reachable {
375		t.mu.Unlock()
376		s.cancel()
377		return false
378	}
379	if uint32(len(t.activeStreams)) >= t.maxStreams {
380		t.mu.Unlock()
381		t.controlBuf.put(&cleanupStream{
382			streamID: streamID,
383			rst:      true,
384			rstCode:  http2.ErrCodeRefusedStream,
385			onWrite:  func() {},
386		})
387		s.cancel()
388		return false
389	}
390	if streamID%2 != 1 || streamID <= t.maxStreamID {
391		t.mu.Unlock()
392		// illegal gRPC stream id.
393		errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
394		s.cancel()
395		return true
396	}
397	t.maxStreamID = streamID
398	t.activeStreams[streamID] = s
399	if len(t.activeStreams) == 1 {
400		t.idle = time.Time{}
401	}
402	t.mu.Unlock()
403	if channelz.IsOn() {
404		atomic.AddInt64(&t.czData.streamsStarted, 1)
405		atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
406	}
407	s.requestRead = func(n int) {
408		t.adjustWindow(s, uint32(n))
409	}
410	s.ctx = traceCtx(s.ctx, s.method)
411	if t.stats != nil {
412		s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
413		inHeader := &stats.InHeader{
414			FullMethod:  s.method,
415			RemoteAddr:  t.remoteAddr,
416			LocalAddr:   t.localAddr,
417			Compression: s.recvCompress,
418			WireLength:  int(frame.Header().Length),
419		}
420		t.stats.HandleRPC(s.ctx, inHeader)
421	}
422	s.ctxDone = s.ctx.Done()
423	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
424	s.trReader = &transportReader{
425		reader: &recvBufferReader{
426			ctx:        s.ctx,
427			ctxDone:    s.ctxDone,
428			recv:       s.buf,
429			freeBuffer: t.bufferPool.put,
430		},
431		windowHandler: func(n int) {
432			t.updateWindow(s, uint32(n))
433		},
434	}
435	// Register the stream with loopy.
436	t.controlBuf.put(&registerStream{
437		streamID: s.id,
438		wq:       s.wq,
439	})
440	handle(s)
441	return false
442}
443
444// HandleStreams receives incoming streams using the given handler. This is
445// typically run in a separate goroutine.
446// traceCtx attaches trace to ctx and returns the new context.
447func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
448	defer close(t.readerDone)
449	for {
450		t.controlBuf.throttle()
451		frame, err := t.framer.fr.ReadFrame()
452		atomic.StoreUint32(&t.activity, 1)
453		if err != nil {
454			if se, ok := err.(http2.StreamError); ok {
455				warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
456				t.mu.Lock()
457				s := t.activeStreams[se.StreamID]
458				t.mu.Unlock()
459				if s != nil {
460					t.closeStream(s, true, se.Code, false)
461				} else {
462					t.controlBuf.put(&cleanupStream{
463						streamID: se.StreamID,
464						rst:      true,
465						rstCode:  se.Code,
466						onWrite:  func() {},
467					})
468				}
469				continue
470			}
471			if err == io.EOF || err == io.ErrUnexpectedEOF {
472				t.Close()
473				return
474			}
475			warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
476			t.Close()
477			return
478		}
479		switch frame := frame.(type) {
480		case *http2.MetaHeadersFrame:
481			if t.operateHeaders(frame, handle, traceCtx) {
482				t.Close()
483				break
484			}
485		case *http2.DataFrame:
486			t.handleData(frame)
487		case *http2.RSTStreamFrame:
488			t.handleRSTStream(frame)
489		case *http2.SettingsFrame:
490			t.handleSettings(frame)
491		case *http2.PingFrame:
492			t.handlePing(frame)
493		case *http2.WindowUpdateFrame:
494			t.handleWindowUpdate(frame)
495		case *http2.GoAwayFrame:
496			// TODO: Handle GoAway from the client appropriately.
497		default:
498			errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
499		}
500	}
501}
502
503func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
504	t.mu.Lock()
505	defer t.mu.Unlock()
506	if t.activeStreams == nil {
507		// The transport is closing.
508		return nil, false
509	}
510	s, ok := t.activeStreams[f.Header().StreamID]
511	if !ok {
512		// The stream is already done.
513		return nil, false
514	}
515	return s, true
516}
517
518// adjustWindow sends out extra window update over the initial window size
519// of stream if the application is requesting data larger in size than
520// the window.
521func (t *http2Server) adjustWindow(s *Stream, n uint32) {
522	if w := s.fc.maybeAdjust(n); w > 0 {
523		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
524	}
525
526}
527
528// updateWindow adjusts the inbound quota for the stream and the transport.
529// Window updates will deliver to the controller for sending when
530// the cumulative quota exceeds the corresponding threshold.
531func (t *http2Server) updateWindow(s *Stream, n uint32) {
532	if w := s.fc.onRead(n); w > 0 {
533		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
534			increment: w,
535		})
536	}
537}
538
539// updateFlowControl updates the incoming flow control windows
540// for the transport and the stream based on the current bdp
541// estimation.
542func (t *http2Server) updateFlowControl(n uint32) {
543	t.mu.Lock()
544	for _, s := range t.activeStreams {
545		s.fc.newLimit(n)
546	}
547	t.initialWindowSize = int32(n)
548	t.mu.Unlock()
549	t.controlBuf.put(&outgoingWindowUpdate{
550		streamID:  0,
551		increment: t.fc.newLimit(n),
552	})
553	t.controlBuf.put(&outgoingSettings{
554		ss: []http2.Setting{
555			{
556				ID:  http2.SettingInitialWindowSize,
557				Val: n,
558			},
559		},
560	})
561
562}
563
564func (t *http2Server) handleData(f *http2.DataFrame) {
565	size := f.Header().Length
566	var sendBDPPing bool
567	if t.bdpEst != nil {
568		sendBDPPing = t.bdpEst.add(size)
569	}
570	// Decouple connection's flow control from application's read.
571	// An update on connection's flow control should not depend on
572	// whether user application has read the data or not. Such a
573	// restriction is already imposed on the stream's flow control,
574	// and therefore the sender will be blocked anyways.
575	// Decoupling the connection flow control will prevent other
576	// active(fast) streams from starving in presence of slow or
577	// inactive streams.
578	if w := t.fc.onData(size); w > 0 {
579		t.controlBuf.put(&outgoingWindowUpdate{
580			streamID:  0,
581			increment: w,
582		})
583	}
584	if sendBDPPing {
585		// Avoid excessive ping detection (e.g. in an L7 proxy)
586		// by sending a window update prior to the BDP ping.
587		if w := t.fc.reset(); w > 0 {
588			t.controlBuf.put(&outgoingWindowUpdate{
589				streamID:  0,
590				increment: w,
591			})
592		}
593		t.controlBuf.put(bdpPing)
594	}
595	// Select the right stream to dispatch.
596	s, ok := t.getStream(f)
597	if !ok {
598		return
599	}
600	if size > 0 {
601		if err := s.fc.onData(size); err != nil {
602			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
603			return
604		}
605		if f.Header().Flags.Has(http2.FlagDataPadded) {
606			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
607				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
608			}
609		}
610		// TODO(bradfitz, zhaoq): A copy is required here because there is no
611		// guarantee f.Data() is consumed before the arrival of next frame.
612		// Can this copy be eliminated?
613		if len(f.Data()) > 0 {
614			buffer := t.bufferPool.get()
615			buffer.Reset()
616			buffer.Write(f.Data())
617			s.write(recvMsg{buffer: buffer})
618		}
619	}
620	if f.Header().Flags.Has(http2.FlagDataEndStream) {
621		// Received the end of stream from the client.
622		s.compareAndSwapState(streamActive, streamReadDone)
623		s.write(recvMsg{err: io.EOF})
624	}
625}
626
627func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
628	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
629	if s, ok := t.getStream(f); ok {
630		t.closeStream(s, false, 0, false)
631		return
632	}
633	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
634	t.controlBuf.put(&cleanupStream{
635		streamID: f.Header().StreamID,
636		rst:      false,
637		rstCode:  0,
638		onWrite:  func() {},
639	})
640}
641
642func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
643	if f.IsAck() {
644		return
645	}
646	var ss []http2.Setting
647	var updateFuncs []func()
648	f.ForeachSetting(func(s http2.Setting) error {
649		switch s.ID {
650		case http2.SettingMaxHeaderListSize:
651			updateFuncs = append(updateFuncs, func() {
652				t.maxSendHeaderListSize = new(uint32)
653				*t.maxSendHeaderListSize = s.Val
654			})
655		default:
656			ss = append(ss, s)
657		}
658		return nil
659	})
660	t.controlBuf.executeAndPut(func(interface{}) bool {
661		for _, f := range updateFuncs {
662			f()
663		}
664		return true
665	}, &incomingSettings{
666		ss: ss,
667	})
668}
669
670const (
671	maxPingStrikes     = 2
672	defaultPingTimeout = 2 * time.Hour
673)
674
675func (t *http2Server) handlePing(f *http2.PingFrame) {
676	if f.IsAck() {
677		if f.Data == goAwayPing.data && t.drainChan != nil {
678			close(t.drainChan)
679			return
680		}
681		// Maybe it's a BDP ping.
682		if t.bdpEst != nil {
683			t.bdpEst.calculate(f.Data)
684		}
685		return
686	}
687	pingAck := &ping{ack: true}
688	copy(pingAck.data[:], f.Data[:])
689	t.controlBuf.put(pingAck)
690
691	now := time.Now()
692	defer func() {
693		t.lastPingAt = now
694	}()
695	// A reset ping strikes means that we don't need to check for policy
696	// violation for this ping and the pingStrikes counter should be set
697	// to 0.
698	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
699		t.pingStrikes = 0
700		return
701	}
702	t.mu.Lock()
703	ns := len(t.activeStreams)
704	t.mu.Unlock()
705	if ns < 1 && !t.kep.PermitWithoutStream {
706		// Keepalive shouldn't be active thus, this new ping should
707		// have come after at least defaultPingTimeout.
708		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
709			t.pingStrikes++
710		}
711	} else {
712		// Check if keepalive policy is respected.
713		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
714			t.pingStrikes++
715		}
716	}
717
718	if t.pingStrikes > maxPingStrikes {
719		// Send goaway and close the connection.
720		errorf("transport: Got too many pings from the client, closing the connection.")
721		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
722	}
723}
724
725func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
726	t.controlBuf.put(&incomingWindowUpdate{
727		streamID:  f.Header().StreamID,
728		increment: f.Increment,
729	})
730}
731
732func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
733	for k, vv := range md {
734		if isReservedHeader(k) {
735			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
736			continue
737		}
738		for _, v := range vv {
739			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
740		}
741	}
742	return headerFields
743}
744
745func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
746	if t.maxSendHeaderListSize == nil {
747		return true
748	}
749	hdrFrame := it.(*headerFrame)
750	var sz int64
751	for _, f := range hdrFrame.hf {
752		if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
753			errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
754			return false
755		}
756	}
757	return true
758}
759
760// WriteHeader sends the header metadata md back to the client.
761func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
762	if s.updateHeaderSent() || s.getState() == streamDone {
763		return ErrIllegalHeaderWrite
764	}
765	s.hdrMu.Lock()
766	if md.Len() > 0 {
767		if s.header.Len() > 0 {
768			s.header = metadata.Join(s.header, md)
769		} else {
770			s.header = md
771		}
772	}
773	if err := t.writeHeaderLocked(s); err != nil {
774		s.hdrMu.Unlock()
775		return err
776	}
777	s.hdrMu.Unlock()
778	return nil
779}
780
781func (t *http2Server) setResetPingStrikes() {
782	atomic.StoreUint32(&t.resetPingStrikes, 1)
783}
784
785func (t *http2Server) writeHeaderLocked(s *Stream) error {
786	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
787	// first and create a slice of that exact size.
788	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
789	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
790	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
791	if s.sendCompress != "" {
792		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
793	}
794	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
795	success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
796		streamID:  s.id,
797		hf:        headerFields,
798		endStream: false,
799		onWrite:   t.setResetPingStrikes,
800	})
801	if !success {
802		if err != nil {
803			return err
804		}
805		t.closeStream(s, true, http2.ErrCodeInternal, false)
806		return ErrHeaderListSizeLimitViolation
807	}
808	if t.stats != nil {
809		// Note: WireLength is not set in outHeader.
810		// TODO(mmukhi): Revisit this later, if needed.
811		outHeader := &stats.OutHeader{}
812		t.stats.HandleRPC(s.Context(), outHeader)
813	}
814	return nil
815}
816
817// WriteStatus sends stream status to the client and terminates the stream.
818// There is no further I/O operations being able to perform on this stream.
819// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
820// OK is adopted.
821func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
822	if s.getState() == streamDone {
823		return nil
824	}
825	s.hdrMu.Lock()
826	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
827	// first and create a slice of that exact size.
828	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
829	if !s.updateHeaderSent() {                      // No headers have been sent.
830		if len(s.header) > 0 { // Send a separate header frame.
831			if err := t.writeHeaderLocked(s); err != nil {
832				s.hdrMu.Unlock()
833				return err
834			}
835		} else { // Send a trailer only response.
836			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
837			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
838		}
839	}
840	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
841	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
842
843	if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
844		stBytes, err := proto.Marshal(p)
845		if err != nil {
846			// TODO: return error instead, when callers are able to handle it.
847			grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
848		} else {
849			headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
850		}
851	}
852
853	// Attach the trailer metadata.
854	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
855	trailingHeader := &headerFrame{
856		streamID:  s.id,
857		hf:        headerFields,
858		endStream: true,
859		onWrite:   t.setResetPingStrikes,
860	}
861	s.hdrMu.Unlock()
862	success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
863	if !success {
864		if err != nil {
865			return err
866		}
867		t.closeStream(s, true, http2.ErrCodeInternal, false)
868		return ErrHeaderListSizeLimitViolation
869	}
870	// Send a RST_STREAM after the trailers if the client has not already half-closed.
871	rst := s.getState() == streamActive
872	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
873	if t.stats != nil {
874		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
875	}
876	return nil
877}
878
879// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
880// is returns if it fails (e.g., framing error, transport error).
881func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
882	if !s.isHeaderSent() { // Headers haven't been written yet.
883		if err := t.WriteHeader(s, nil); err != nil {
884			if _, ok := err.(ConnectionError); ok {
885				return err
886			}
887			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
888			return status.Errorf(codes.Internal, "transport: %v", err)
889		}
890	} else {
891		// Writing headers checks for this condition.
892		if s.getState() == streamDone {
893			// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
894			s.cancel()
895			select {
896			case <-t.done:
897				return ErrConnClosing
898			default:
899			}
900			return ContextErr(s.ctx.Err())
901		}
902	}
903	// Add some data to header frame so that we can equally distribute bytes across frames.
904	emptyLen := http2MaxFrameLen - len(hdr)
905	if emptyLen > len(data) {
906		emptyLen = len(data)
907	}
908	hdr = append(hdr, data[:emptyLen]...)
909	data = data[emptyLen:]
910	df := &dataFrame{
911		streamID:    s.id,
912		h:           hdr,
913		d:           data,
914		onEachWrite: t.setResetPingStrikes,
915	}
916	if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
917		select {
918		case <-t.done:
919			return ErrConnClosing
920		default:
921		}
922		return ContextErr(s.ctx.Err())
923	}
924	return t.controlBuf.put(df)
925}
926
927// keepalive running in a separate goroutine does the following:
928// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
929// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
930// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
931// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
932// after an additional duration of keepalive.Timeout.
933func (t *http2Server) keepalive() {
934	p := &ping{}
935	var pingSent bool
936	maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
937	maxAge := time.NewTimer(t.kp.MaxConnectionAge)
938	keepalive := time.NewTimer(t.kp.Time)
939	// NOTE: All exit paths of this function should reset their
940	// respective timers. A failure to do so will cause the
941	// following clean-up to deadlock and eventually leak.
942	defer func() {
943		if !maxIdle.Stop() {
944			<-maxIdle.C
945		}
946		if !maxAge.Stop() {
947			<-maxAge.C
948		}
949		if !keepalive.Stop() {
950			<-keepalive.C
951		}
952	}()
953	for {
954		select {
955		case <-maxIdle.C:
956			t.mu.Lock()
957			idle := t.idle
958			if idle.IsZero() { // The connection is non-idle.
959				t.mu.Unlock()
960				maxIdle.Reset(t.kp.MaxConnectionIdle)
961				continue
962			}
963			val := t.kp.MaxConnectionIdle - time.Since(idle)
964			t.mu.Unlock()
965			if val <= 0 {
966				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
967				// Gracefully close the connection.
968				t.drain(http2.ErrCodeNo, []byte{})
969				// Resetting the timer so that the clean-up doesn't deadlock.
970				maxIdle.Reset(infinity)
971				return
972			}
973			maxIdle.Reset(val)
974		case <-maxAge.C:
975			t.drain(http2.ErrCodeNo, []byte{})
976			maxAge.Reset(t.kp.MaxConnectionAgeGrace)
977			select {
978			case <-maxAge.C:
979				// Close the connection after grace period.
980				infof("transport: closing server transport due to maximum connection age.")
981				t.Close()
982				// Resetting the timer so that the clean-up doesn't deadlock.
983				maxAge.Reset(infinity)
984			case <-t.done:
985			}
986			return
987		case <-keepalive.C:
988			if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
989				pingSent = false
990				keepalive.Reset(t.kp.Time)
991				continue
992			}
993			if pingSent {
994				infof("transport: closing server transport due to idleness.")
995				t.Close()
996				// Resetting the timer so that the clean-up doesn't deadlock.
997				keepalive.Reset(infinity)
998				return
999			}
1000			pingSent = true
1001			if channelz.IsOn() {
1002				atomic.AddInt64(&t.czData.kpCount, 1)
1003			}
1004			t.controlBuf.put(p)
1005			keepalive.Reset(t.kp.Timeout)
1006		case <-t.done:
1007			return
1008		}
1009	}
1010}
1011
1012// Close starts shutting down the http2Server transport.
1013// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1014// could cause some resource issue. Revisit this later.
1015func (t *http2Server) Close() error {
1016	t.mu.Lock()
1017	if t.state == closing {
1018		t.mu.Unlock()
1019		return errors.New("transport: Close() was already called")
1020	}
1021	t.state = closing
1022	streams := t.activeStreams
1023	t.activeStreams = nil
1024	t.mu.Unlock()
1025	t.controlBuf.finish()
1026	close(t.done)
1027	err := t.conn.Close()
1028	if channelz.IsOn() {
1029		channelz.RemoveEntry(t.channelzID)
1030	}
1031	// Cancel all active streams.
1032	for _, s := range streams {
1033		s.cancel()
1034	}
1035	if t.stats != nil {
1036		connEnd := &stats.ConnEnd{}
1037		t.stats.HandleConn(t.ctx, connEnd)
1038	}
1039	return err
1040}
1041
1042// deleteStream deletes the stream s from transport's active streams.
1043func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1044	// In case stream sending and receiving are invoked in separate
1045	// goroutines (e.g., bi-directional streaming), cancel needs to be
1046	// called to interrupt the potential blocking on other goroutines.
1047	s.cancel()
1048
1049	t.mu.Lock()
1050	if _, ok := t.activeStreams[s.id]; ok {
1051		delete(t.activeStreams, s.id)
1052		if len(t.activeStreams) == 0 {
1053			t.idle = time.Now()
1054		}
1055	}
1056	t.mu.Unlock()
1057
1058	if channelz.IsOn() {
1059		if eosReceived {
1060			atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1061		} else {
1062			atomic.AddInt64(&t.czData.streamsFailed, 1)
1063		}
1064	}
1065}
1066
1067// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1068func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1069	oldState := s.swapState(streamDone)
1070	if oldState == streamDone {
1071		// If the stream was already done, return.
1072		return
1073	}
1074
1075	hdr.cleanup = &cleanupStream{
1076		streamID: s.id,
1077		rst:      rst,
1078		rstCode:  rstCode,
1079		onWrite: func() {
1080			t.deleteStream(s, eosReceived)
1081		},
1082	}
1083	t.controlBuf.put(hdr)
1084}
1085
1086// closeStream clears the footprint of a stream when the stream is not needed any more.
1087func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1088	s.swapState(streamDone)
1089	t.deleteStream(s, eosReceived)
1090
1091	t.controlBuf.put(&cleanupStream{
1092		streamID: s.id,
1093		rst:      rst,
1094		rstCode:  rstCode,
1095		onWrite:  func() {},
1096	})
1097}
1098
1099func (t *http2Server) RemoteAddr() net.Addr {
1100	return t.remoteAddr
1101}
1102
1103func (t *http2Server) Drain() {
1104	t.drain(http2.ErrCodeNo, []byte{})
1105}
1106
1107func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1108	t.mu.Lock()
1109	defer t.mu.Unlock()
1110	if t.drainChan != nil {
1111		return
1112	}
1113	t.drainChan = make(chan struct{})
1114	t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1115}
1116
1117var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1118
1119// Handles outgoing GoAway and returns true if loopy needs to put itself
1120// in draining mode.
1121func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1122	t.mu.Lock()
1123	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1124		t.mu.Unlock()
1125		// The transport is closing.
1126		return false, ErrConnClosing
1127	}
1128	sid := t.maxStreamID
1129	if !g.headsUp {
1130		// Stop accepting more streams now.
1131		t.state = draining
1132		if len(t.activeStreams) == 0 {
1133			g.closeConn = true
1134		}
1135		t.mu.Unlock()
1136		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1137			return false, err
1138		}
1139		if g.closeConn {
1140			// Abruptly close the connection following the GoAway (via
1141			// loopywriter).  But flush out what's inside the buffer first.
1142			t.framer.writer.Flush()
1143			return false, fmt.Errorf("transport: Connection closing")
1144		}
1145		return true, nil
1146	}
1147	t.mu.Unlock()
1148	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1149	// Follow that with a ping and wait for the ack to come back or a timer
1150	// to expire. During this time accept new streams since they might have
1151	// originated before the GoAway reaches the client.
1152	// After getting the ack or timer expiration send out another GoAway this
1153	// time with an ID of the max stream server intends to process.
1154	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1155		return false, err
1156	}
1157	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1158		return false, err
1159	}
1160	go func() {
1161		timer := time.NewTimer(time.Minute)
1162		defer timer.Stop()
1163		select {
1164		case <-t.drainChan:
1165		case <-timer.C:
1166		case <-t.done:
1167			return
1168		}
1169		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1170	}()
1171	return false, nil
1172}
1173
1174func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1175	s := channelz.SocketInternalMetric{
1176		StreamsStarted:                   atomic.LoadInt64(&t.czData.streamsStarted),
1177		StreamsSucceeded:                 atomic.LoadInt64(&t.czData.streamsSucceeded),
1178		StreamsFailed:                    atomic.LoadInt64(&t.czData.streamsFailed),
1179		MessagesSent:                     atomic.LoadInt64(&t.czData.msgSent),
1180		MessagesReceived:                 atomic.LoadInt64(&t.czData.msgRecv),
1181		KeepAlivesSent:                   atomic.LoadInt64(&t.czData.kpCount),
1182		LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1183		LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1184		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1185		LocalFlowControlWindow:           int64(t.fc.getSize()),
1186		SocketOptions:                    channelz.GetSocketOption(t.conn),
1187		LocalAddr:                        t.localAddr,
1188		RemoteAddr:                       t.remoteAddr,
1189		// RemoteName :
1190	}
1191	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1192		s.Security = au.GetSecurityValue()
1193	}
1194	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1195	return &s
1196}
1197
1198func (t *http2Server) IncrMsgSent() {
1199	atomic.AddInt64(&t.czData.msgSent, 1)
1200	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1201}
1202
1203func (t *http2Server) IncrMsgRecv() {
1204	atomic.AddInt64(&t.czData.msgRecv, 1)
1205	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1206}
1207
1208func (t *http2Server) getOutFlowWindow() int64 {
1209	resp := make(chan uint32, 1)
1210	timer := time.NewTimer(time.Second)
1211	defer timer.Stop()
1212	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1213	select {
1214	case sz := <-resp:
1215		return int64(sz)
1216	case <-t.done:
1217		return -1
1218	case <-timer.C:
1219		return -2
1220	}
1221}
1222
1223func getJitter(v time.Duration) time.Duration {
1224	if v == infinity {
1225		return 0
1226	}
1227	// Generate a jitter between +/- 10% of the value.
1228	r := int64(v / 10)
1229	j := grpcrand.Int63n(2*r) - r
1230	return time.Duration(j)
1231}
1232