1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"context"
24	"errors"
25	"fmt"
26	"io"
27	"math"
28	"net"
29	"strconv"
30	"sync"
31	"sync/atomic"
32	"time"
33
34	"github.com/golang/protobuf/proto"
35	"golang.org/x/net/http2"
36	"golang.org/x/net/http2/hpack"
37
38	spb "google.golang.org/genproto/googleapis/rpc/status"
39	"google.golang.org/grpc/codes"
40	"google.golang.org/grpc/credentials"
41	"google.golang.org/grpc/grpclog"
42	"google.golang.org/grpc/internal"
43	"google.golang.org/grpc/internal/channelz"
44	"google.golang.org/grpc/internal/grpcrand"
45	"google.golang.org/grpc/keepalive"
46	"google.golang.org/grpc/metadata"
47	"google.golang.org/grpc/peer"
48	"google.golang.org/grpc/stats"
49	"google.golang.org/grpc/status"
50	"google.golang.org/grpc/tap"
51)
52
53var (
54	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
55	// the stream's state.
56	ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
57	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
58	// than the limit set by peer.
59	ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
60	// statusRawProto is a function to get to the raw status proto wrapped in a
61	// status.Status without a proto.Clone().
62	statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
63)
64
65// http2Server implements the ServerTransport interface with HTTP2.
66type http2Server struct {
67	ctx         context.Context
68	ctxDone     <-chan struct{} // Cache the context.Done() chan
69	cancel      context.CancelFunc
70	conn        net.Conn
71	loopy       *loopyWriter
72	readerDone  chan struct{} // sync point to enable testing.
73	writerDone  chan struct{} // sync point to enable testing.
74	remoteAddr  net.Addr
75	localAddr   net.Addr
76	maxStreamID uint32               // max stream ID ever seen
77	authInfo    credentials.AuthInfo // auth info about the connection
78	inTapHandle tap.ServerInHandle
79	framer      *framer
80	// The max number of concurrent streams.
81	maxStreams uint32
82	// controlBuf delivers all the control related tasks (e.g., window
83	// updates, reset streams, and various settings) to the controller.
84	controlBuf *controlBuffer
85	fc         *trInFlow
86	stats      stats.Handler
87	// Flag to keep track of reading activity on transport.
88	// 1 is true and 0 is false.
89	activity uint32 // Accessed atomically.
90	// Keepalive and max-age parameters for the server.
91	kp keepalive.ServerParameters
92
93	// Keepalive enforcement policy.
94	kep keepalive.EnforcementPolicy
95	// The time instance last ping was received.
96	lastPingAt time.Time
97	// Number of times the client has violated keepalive ping policy so far.
98	pingStrikes uint8
99	// Flag to signify that number of ping strikes should be reset to 0.
100	// This is set whenever data or header frames are sent.
101	// 1 means yes.
102	resetPingStrikes      uint32 // Accessed atomically.
103	initialWindowSize     int32
104	bdpEst                *bdpEstimator
105	maxSendHeaderListSize *uint32
106
107	mu sync.Mutex // guard the following
108
109	// drainChan is initialized when drain(...) is called the first time.
110	// After which the server writes out the first GoAway(with ID 2^31-1) frame.
111	// Then an independent goroutine will be launched to later send the second GoAway.
112	// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
113	// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
114	// already underway.
115	drainChan     chan struct{}
116	state         transportState
117	activeStreams map[uint32]*Stream
118	// idle is the time instant when the connection went idle.
119	// This is either the beginning of the connection or when the number of
120	// RPCs go down to 0.
121	// When the connection is busy, this value is set to 0.
122	idle time.Time
123
124	// Fields below are for channelz metric collection.
125	channelzID int64 // channelz unique identification number
126	czData     *channelzData
127	bufferPool *bufferPool
128}
129
130// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
131// returned if something goes wrong.
132func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
133	writeBufSize := config.WriteBufferSize
134	readBufSize := config.ReadBufferSize
135	maxHeaderListSize := defaultServerMaxHeaderListSize
136	if config.MaxHeaderListSize != nil {
137		maxHeaderListSize = *config.MaxHeaderListSize
138	}
139	framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
140	// Send initial settings as connection preface to client.
141	isettings := []http2.Setting{{
142		ID:  http2.SettingMaxFrameSize,
143		Val: http2MaxFrameLen,
144	}}
145	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
146	// permitted in the HTTP2 spec.
147	maxStreams := config.MaxStreams
148	if maxStreams == 0 {
149		maxStreams = math.MaxUint32
150	} else {
151		isettings = append(isettings, http2.Setting{
152			ID:  http2.SettingMaxConcurrentStreams,
153			Val: maxStreams,
154		})
155	}
156	dynamicWindow := true
157	iwz := int32(initialWindowSize)
158	if config.InitialWindowSize >= defaultWindowSize {
159		iwz = config.InitialWindowSize
160		dynamicWindow = false
161	}
162	icwz := int32(initialWindowSize)
163	if config.InitialConnWindowSize >= defaultWindowSize {
164		icwz = config.InitialConnWindowSize
165		dynamicWindow = false
166	}
167	if iwz != defaultWindowSize {
168		isettings = append(isettings, http2.Setting{
169			ID:  http2.SettingInitialWindowSize,
170			Val: uint32(iwz)})
171	}
172	if config.MaxHeaderListSize != nil {
173		isettings = append(isettings, http2.Setting{
174			ID:  http2.SettingMaxHeaderListSize,
175			Val: *config.MaxHeaderListSize,
176		})
177	}
178	if err := framer.fr.WriteSettings(isettings...); err != nil {
179		return nil, connectionErrorf(false, err, "transport: %v", err)
180	}
181	// Adjust the connection flow control window if needed.
182	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
183		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
184			return nil, connectionErrorf(false, err, "transport: %v", err)
185		}
186	}
187	kp := config.KeepaliveParams
188	if kp.MaxConnectionIdle == 0 {
189		kp.MaxConnectionIdle = defaultMaxConnectionIdle
190	}
191	if kp.MaxConnectionAge == 0 {
192		kp.MaxConnectionAge = defaultMaxConnectionAge
193	}
194	// Add a jitter to MaxConnectionAge.
195	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
196	if kp.MaxConnectionAgeGrace == 0 {
197		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
198	}
199	if kp.Time == 0 {
200		kp.Time = defaultServerKeepaliveTime
201	}
202	if kp.Timeout == 0 {
203		kp.Timeout = defaultServerKeepaliveTimeout
204	}
205	kep := config.KeepalivePolicy
206	if kep.MinTime == 0 {
207		kep.MinTime = defaultKeepalivePolicyMinTime
208	}
209	ctx, cancel := context.WithCancel(context.Background())
210	t := &http2Server{
211		ctx:               ctx,
212		cancel:            cancel,
213		ctxDone:           ctx.Done(),
214		conn:              conn,
215		remoteAddr:        conn.RemoteAddr(),
216		localAddr:         conn.LocalAddr(),
217		authInfo:          config.AuthInfo,
218		framer:            framer,
219		readerDone:        make(chan struct{}),
220		writerDone:        make(chan struct{}),
221		maxStreams:        maxStreams,
222		inTapHandle:       config.InTapHandle,
223		fc:                &trInFlow{limit: uint32(icwz)},
224		state:             reachable,
225		activeStreams:     make(map[uint32]*Stream),
226		stats:             config.StatsHandler,
227		kp:                kp,
228		idle:              time.Now(),
229		kep:               kep,
230		initialWindowSize: iwz,
231		czData:            new(channelzData),
232		bufferPool:        newBufferPool(),
233	}
234	t.controlBuf = newControlBuffer(t.ctxDone)
235	if dynamicWindow {
236		t.bdpEst = &bdpEstimator{
237			bdp:               initialWindowSize,
238			updateFlowControl: t.updateFlowControl,
239		}
240	}
241	if t.stats != nil {
242		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
243			RemoteAddr: t.remoteAddr,
244			LocalAddr:  t.localAddr,
245		})
246		connBegin := &stats.ConnBegin{}
247		t.stats.HandleConn(t.ctx, connBegin)
248	}
249	if channelz.IsOn() {
250		t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
251	}
252	t.framer.writer.Flush()
253
254	defer func() {
255		if err != nil {
256			t.Close()
257		}
258	}()
259
260	// Check the validity of client preface.
261	preface := make([]byte, len(clientPreface))
262	if _, err := io.ReadFull(t.conn, preface); err != nil {
263		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
264	}
265	if !bytes.Equal(preface, clientPreface) {
266		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
267	}
268
269	frame, err := t.framer.fr.ReadFrame()
270	if err == io.EOF || err == io.ErrUnexpectedEOF {
271		return nil, err
272	}
273	if err != nil {
274		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
275	}
276	atomic.StoreUint32(&t.activity, 1)
277	sf, ok := frame.(*http2.SettingsFrame)
278	if !ok {
279		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
280	}
281	t.handleSettings(sf)
282
283	go func() {
284		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
285		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
286		if err := t.loopy.run(); err != nil {
287			errorf("transport: loopyWriter.run returning. Err: %v", err)
288		}
289		t.conn.Close()
290		close(t.writerDone)
291	}()
292	go t.keepalive()
293	return t, nil
294}
295
296// operateHeader takes action on the decoded headers.
297func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
298	streamID := frame.Header().StreamID
299	state := &decodeState{
300		serverSide: true,
301	}
302	if err := state.decodeHeader(frame); err != nil {
303		if se, ok := status.FromError(err); ok {
304			t.controlBuf.put(&cleanupStream{
305				streamID: streamID,
306				rst:      true,
307				rstCode:  statusCodeConvTab[se.Code()],
308				onWrite:  func() {},
309			})
310		}
311		return false
312	}
313
314	buf := newRecvBuffer()
315	s := &Stream{
316		id:             streamID,
317		st:             t,
318		buf:            buf,
319		fc:             &inFlow{limit: uint32(t.initialWindowSize)},
320		recvCompress:   state.data.encoding,
321		method:         state.data.method,
322		contentSubtype: state.data.contentSubtype,
323	}
324	if frame.StreamEnded() {
325		// s is just created by the caller. No lock needed.
326		s.state = streamReadDone
327	}
328	if state.data.timeoutSet {
329		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
330	} else {
331		s.ctx, s.cancel = context.WithCancel(t.ctx)
332	}
333	pr := &peer.Peer{
334		Addr: t.remoteAddr,
335	}
336	// Attach Auth info if there is any.
337	if t.authInfo != nil {
338		pr.AuthInfo = t.authInfo
339	}
340	s.ctx = peer.NewContext(s.ctx, pr)
341	// Attach the received metadata to the context.
342	if len(state.data.mdata) > 0 {
343		s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
344	}
345	if state.data.statsTags != nil {
346		s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
347	}
348	if state.data.statsTrace != nil {
349		s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
350	}
351	if t.inTapHandle != nil {
352		var err error
353		info := &tap.Info{
354			FullMethodName: state.data.method,
355		}
356		s.ctx, err = t.inTapHandle(s.ctx, info)
357		if err != nil {
358			warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
359			t.controlBuf.put(&cleanupStream{
360				streamID: s.id,
361				rst:      true,
362				rstCode:  http2.ErrCodeRefusedStream,
363				onWrite:  func() {},
364			})
365			return false
366		}
367	}
368	t.mu.Lock()
369	if t.state != reachable {
370		t.mu.Unlock()
371		return false
372	}
373	if uint32(len(t.activeStreams)) >= t.maxStreams {
374		t.mu.Unlock()
375		t.controlBuf.put(&cleanupStream{
376			streamID: streamID,
377			rst:      true,
378			rstCode:  http2.ErrCodeRefusedStream,
379			onWrite:  func() {},
380		})
381		return false
382	}
383	if streamID%2 != 1 || streamID <= t.maxStreamID {
384		t.mu.Unlock()
385		// illegal gRPC stream id.
386		errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
387		return true
388	}
389	t.maxStreamID = streamID
390	t.activeStreams[streamID] = s
391	if len(t.activeStreams) == 1 {
392		t.idle = time.Time{}
393	}
394	t.mu.Unlock()
395	if channelz.IsOn() {
396		atomic.AddInt64(&t.czData.streamsStarted, 1)
397		atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
398	}
399	s.requestRead = func(n int) {
400		t.adjustWindow(s, uint32(n))
401	}
402	s.ctx = traceCtx(s.ctx, s.method)
403	if t.stats != nil {
404		s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
405		inHeader := &stats.InHeader{
406			FullMethod:  s.method,
407			RemoteAddr:  t.remoteAddr,
408			LocalAddr:   t.localAddr,
409			Compression: s.recvCompress,
410			WireLength:  int(frame.Header().Length),
411		}
412		t.stats.HandleRPC(s.ctx, inHeader)
413	}
414	s.ctxDone = s.ctx.Done()
415	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
416	s.trReader = &transportReader{
417		reader: &recvBufferReader{
418			ctx:        s.ctx,
419			ctxDone:    s.ctxDone,
420			recv:       s.buf,
421			freeBuffer: t.bufferPool.put,
422		},
423		windowHandler: func(n int) {
424			t.updateWindow(s, uint32(n))
425		},
426	}
427	// Register the stream with loopy.
428	t.controlBuf.put(&registerStream{
429		streamID: s.id,
430		wq:       s.wq,
431	})
432	handle(s)
433	return false
434}
435
436// HandleStreams receives incoming streams using the given handler. This is
437// typically run in a separate goroutine.
438// traceCtx attaches trace to ctx and returns the new context.
439func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
440	defer close(t.readerDone)
441	for {
442		t.controlBuf.throttle()
443		frame, err := t.framer.fr.ReadFrame()
444		atomic.StoreUint32(&t.activity, 1)
445		if err != nil {
446			if se, ok := err.(http2.StreamError); ok {
447				warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
448				t.mu.Lock()
449				s := t.activeStreams[se.StreamID]
450				t.mu.Unlock()
451				if s != nil {
452					t.closeStream(s, true, se.Code, false)
453				} else {
454					t.controlBuf.put(&cleanupStream{
455						streamID: se.StreamID,
456						rst:      true,
457						rstCode:  se.Code,
458						onWrite:  func() {},
459					})
460				}
461				continue
462			}
463			if err == io.EOF || err == io.ErrUnexpectedEOF {
464				t.Close()
465				return
466			}
467			warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
468			t.Close()
469			return
470		}
471		switch frame := frame.(type) {
472		case *http2.MetaHeadersFrame:
473			if t.operateHeaders(frame, handle, traceCtx) {
474				t.Close()
475				break
476			}
477		case *http2.DataFrame:
478			t.handleData(frame)
479		case *http2.RSTStreamFrame:
480			t.handleRSTStream(frame)
481		case *http2.SettingsFrame:
482			t.handleSettings(frame)
483		case *http2.PingFrame:
484			t.handlePing(frame)
485		case *http2.WindowUpdateFrame:
486			t.handleWindowUpdate(frame)
487		case *http2.GoAwayFrame:
488			// TODO: Handle GoAway from the client appropriately.
489		default:
490			errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
491		}
492	}
493}
494
495func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
496	t.mu.Lock()
497	defer t.mu.Unlock()
498	if t.activeStreams == nil {
499		// The transport is closing.
500		return nil, false
501	}
502	s, ok := t.activeStreams[f.Header().StreamID]
503	if !ok {
504		// The stream is already done.
505		return nil, false
506	}
507	return s, true
508}
509
510// adjustWindow sends out extra window update over the initial window size
511// of stream if the application is requesting data larger in size than
512// the window.
513func (t *http2Server) adjustWindow(s *Stream, n uint32) {
514	if w := s.fc.maybeAdjust(n); w > 0 {
515		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
516	}
517
518}
519
520// updateWindow adjusts the inbound quota for the stream and the transport.
521// Window updates will deliver to the controller for sending when
522// the cumulative quota exceeds the corresponding threshold.
523func (t *http2Server) updateWindow(s *Stream, n uint32) {
524	if w := s.fc.onRead(n); w > 0 {
525		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
526			increment: w,
527		})
528	}
529}
530
531// updateFlowControl updates the incoming flow control windows
532// for the transport and the stream based on the current bdp
533// estimation.
534func (t *http2Server) updateFlowControl(n uint32) {
535	t.mu.Lock()
536	for _, s := range t.activeStreams {
537		s.fc.newLimit(n)
538	}
539	t.initialWindowSize = int32(n)
540	t.mu.Unlock()
541	t.controlBuf.put(&outgoingWindowUpdate{
542		streamID:  0,
543		increment: t.fc.newLimit(n),
544	})
545	t.controlBuf.put(&outgoingSettings{
546		ss: []http2.Setting{
547			{
548				ID:  http2.SettingInitialWindowSize,
549				Val: n,
550			},
551		},
552	})
553
554}
555
556func (t *http2Server) handleData(f *http2.DataFrame) {
557	size := f.Header().Length
558	var sendBDPPing bool
559	if t.bdpEst != nil {
560		sendBDPPing = t.bdpEst.add(size)
561	}
562	// Decouple connection's flow control from application's read.
563	// An update on connection's flow control should not depend on
564	// whether user application has read the data or not. Such a
565	// restriction is already imposed on the stream's flow control,
566	// and therefore the sender will be blocked anyways.
567	// Decoupling the connection flow control will prevent other
568	// active(fast) streams from starving in presence of slow or
569	// inactive streams.
570	if w := t.fc.onData(size); w > 0 {
571		t.controlBuf.put(&outgoingWindowUpdate{
572			streamID:  0,
573			increment: w,
574		})
575	}
576	if sendBDPPing {
577		// Avoid excessive ping detection (e.g. in an L7 proxy)
578		// by sending a window update prior to the BDP ping.
579		if w := t.fc.reset(); w > 0 {
580			t.controlBuf.put(&outgoingWindowUpdate{
581				streamID:  0,
582				increment: w,
583			})
584		}
585		t.controlBuf.put(bdpPing)
586	}
587	// Select the right stream to dispatch.
588	s, ok := t.getStream(f)
589	if !ok {
590		return
591	}
592	if size > 0 {
593		if err := s.fc.onData(size); err != nil {
594			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
595			return
596		}
597		if f.Header().Flags.Has(http2.FlagDataPadded) {
598			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
599				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
600			}
601		}
602		// TODO(bradfitz, zhaoq): A copy is required here because there is no
603		// guarantee f.Data() is consumed before the arrival of next frame.
604		// Can this copy be eliminated?
605		if len(f.Data()) > 0 {
606			buffer := t.bufferPool.get()
607			buffer.Reset()
608			buffer.Write(f.Data())
609			s.write(recvMsg{buffer: buffer})
610		}
611	}
612	if f.Header().Flags.Has(http2.FlagDataEndStream) {
613		// Received the end of stream from the client.
614		s.compareAndSwapState(streamActive, streamReadDone)
615		s.write(recvMsg{err: io.EOF})
616	}
617}
618
619func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
620	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
621	if s, ok := t.getStream(f); ok {
622		t.closeStream(s, false, 0, false)
623		return
624	}
625	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
626	t.controlBuf.put(&cleanupStream{
627		streamID: f.Header().StreamID,
628		rst:      false,
629		rstCode:  0,
630		onWrite:  func() {},
631	})
632}
633
634func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
635	if f.IsAck() {
636		return
637	}
638	var ss []http2.Setting
639	var updateFuncs []func()
640	f.ForeachSetting(func(s http2.Setting) error {
641		switch s.ID {
642		case http2.SettingMaxHeaderListSize:
643			updateFuncs = append(updateFuncs, func() {
644				t.maxSendHeaderListSize = new(uint32)
645				*t.maxSendHeaderListSize = s.Val
646			})
647		default:
648			ss = append(ss, s)
649		}
650		return nil
651	})
652	t.controlBuf.executeAndPut(func(interface{}) bool {
653		for _, f := range updateFuncs {
654			f()
655		}
656		return true
657	}, &incomingSettings{
658		ss: ss,
659	})
660}
661
662const (
663	maxPingStrikes     = 2
664	defaultPingTimeout = 2 * time.Hour
665)
666
667func (t *http2Server) handlePing(f *http2.PingFrame) {
668	if f.IsAck() {
669		if f.Data == goAwayPing.data && t.drainChan != nil {
670			close(t.drainChan)
671			return
672		}
673		// Maybe it's a BDP ping.
674		if t.bdpEst != nil {
675			t.bdpEst.calculate(f.Data)
676		}
677		return
678	}
679	pingAck := &ping{ack: true}
680	copy(pingAck.data[:], f.Data[:])
681	t.controlBuf.put(pingAck)
682
683	now := time.Now()
684	defer func() {
685		t.lastPingAt = now
686	}()
687	// A reset ping strikes means that we don't need to check for policy
688	// violation for this ping and the pingStrikes counter should be set
689	// to 0.
690	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
691		t.pingStrikes = 0
692		return
693	}
694	t.mu.Lock()
695	ns := len(t.activeStreams)
696	t.mu.Unlock()
697	if ns < 1 && !t.kep.PermitWithoutStream {
698		// Keepalive shouldn't be active thus, this new ping should
699		// have come after at least defaultPingTimeout.
700		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
701			t.pingStrikes++
702		}
703	} else {
704		// Check if keepalive policy is respected.
705		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
706			t.pingStrikes++
707		}
708	}
709
710	if t.pingStrikes > maxPingStrikes {
711		// Send goaway and close the connection.
712		errorf("transport: Got too many pings from the client, closing the connection.")
713		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
714	}
715}
716
717func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
718	t.controlBuf.put(&incomingWindowUpdate{
719		streamID:  f.Header().StreamID,
720		increment: f.Increment,
721	})
722}
723
724func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
725	for k, vv := range md {
726		if isReservedHeader(k) {
727			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
728			continue
729		}
730		for _, v := range vv {
731			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
732		}
733	}
734	return headerFields
735}
736
737func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
738	if t.maxSendHeaderListSize == nil {
739		return true
740	}
741	hdrFrame := it.(*headerFrame)
742	var sz int64
743	for _, f := range hdrFrame.hf {
744		if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
745			errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
746			return false
747		}
748	}
749	return true
750}
751
752// WriteHeader sends the header metedata md back to the client.
753func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
754	if s.updateHeaderSent() || s.getState() == streamDone {
755		return ErrIllegalHeaderWrite
756	}
757	s.hdrMu.Lock()
758	if md.Len() > 0 {
759		if s.header.Len() > 0 {
760			s.header = metadata.Join(s.header, md)
761		} else {
762			s.header = md
763		}
764	}
765	if err := t.writeHeaderLocked(s); err != nil {
766		s.hdrMu.Unlock()
767		return err
768	}
769	s.hdrMu.Unlock()
770	return nil
771}
772
773func (t *http2Server) setResetPingStrikes() {
774	atomic.StoreUint32(&t.resetPingStrikes, 1)
775}
776
777func (t *http2Server) writeHeaderLocked(s *Stream) error {
778	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
779	// first and create a slice of that exact size.
780	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
781	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
782	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
783	if s.sendCompress != "" {
784		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
785	}
786	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
787	success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
788		streamID:  s.id,
789		hf:        headerFields,
790		endStream: false,
791		onWrite:   t.setResetPingStrikes,
792	})
793	if !success {
794		if err != nil {
795			return err
796		}
797		t.closeStream(s, true, http2.ErrCodeInternal, false)
798		return ErrHeaderListSizeLimitViolation
799	}
800	if t.stats != nil {
801		// Note: WireLength is not set in outHeader.
802		// TODO(mmukhi): Revisit this later, if needed.
803		outHeader := &stats.OutHeader{}
804		t.stats.HandleRPC(s.Context(), outHeader)
805	}
806	return nil
807}
808
809// WriteStatus sends stream status to the client and terminates the stream.
810// There is no further I/O operations being able to perform on this stream.
811// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
812// OK is adopted.
813func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
814	if s.getState() == streamDone {
815		return nil
816	}
817	s.hdrMu.Lock()
818	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
819	// first and create a slice of that exact size.
820	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
821	if !s.updateHeaderSent() {                      // No headers have been sent.
822		if len(s.header) > 0 { // Send a separate header frame.
823			if err := t.writeHeaderLocked(s); err != nil {
824				s.hdrMu.Unlock()
825				return err
826			}
827		} else { // Send a trailer only response.
828			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
829			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
830		}
831	}
832	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
833	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
834
835	if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
836		stBytes, err := proto.Marshal(p)
837		if err != nil {
838			// TODO: return error instead, when callers are able to handle it.
839			grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
840		} else {
841			headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
842		}
843	}
844
845	// Attach the trailer metadata.
846	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
847	trailingHeader := &headerFrame{
848		streamID:  s.id,
849		hf:        headerFields,
850		endStream: true,
851		onWrite:   t.setResetPingStrikes,
852	}
853	s.hdrMu.Unlock()
854	success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
855	if !success {
856		if err != nil {
857			return err
858		}
859		t.closeStream(s, true, http2.ErrCodeInternal, false)
860		return ErrHeaderListSizeLimitViolation
861	}
862	// Send a RST_STREAM after the trailers if the client has not already half-closed.
863	rst := s.getState() == streamActive
864	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
865	if t.stats != nil {
866		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
867	}
868	return nil
869}
870
871// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
872// is returns if it fails (e.g., framing error, transport error).
873func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
874	if !s.isHeaderSent() { // Headers haven't been written yet.
875		if err := t.WriteHeader(s, nil); err != nil {
876			if _, ok := err.(ConnectionError); ok {
877				return err
878			}
879			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
880			return status.Errorf(codes.Internal, "transport: %v", err)
881		}
882	} else {
883		// Writing headers checks for this condition.
884		if s.getState() == streamDone {
885			// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
886			s.cancel()
887			select {
888			case <-t.ctx.Done():
889				return ErrConnClosing
890			default:
891			}
892			return ContextErr(s.ctx.Err())
893		}
894	}
895	// Add some data to header frame so that we can equally distribute bytes across frames.
896	emptyLen := http2MaxFrameLen - len(hdr)
897	if emptyLen > len(data) {
898		emptyLen = len(data)
899	}
900	hdr = append(hdr, data[:emptyLen]...)
901	data = data[emptyLen:]
902	df := &dataFrame{
903		streamID:    s.id,
904		h:           hdr,
905		d:           data,
906		onEachWrite: t.setResetPingStrikes,
907	}
908	if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
909		select {
910		case <-t.ctx.Done():
911			return ErrConnClosing
912		default:
913		}
914		return ContextErr(s.ctx.Err())
915	}
916	return t.controlBuf.put(df)
917}
918
919// keepalive running in a separate goroutine does the following:
920// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
921// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
922// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
923// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
924// after an additional duration of keepalive.Timeout.
925func (t *http2Server) keepalive() {
926	p := &ping{}
927	var pingSent bool
928	maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
929	maxAge := time.NewTimer(t.kp.MaxConnectionAge)
930	keepalive := time.NewTimer(t.kp.Time)
931	// NOTE: All exit paths of this function should reset their
932	// respective timers. A failure to do so will cause the
933	// following clean-up to deadlock and eventually leak.
934	defer func() {
935		if !maxIdle.Stop() {
936			<-maxIdle.C
937		}
938		if !maxAge.Stop() {
939			<-maxAge.C
940		}
941		if !keepalive.Stop() {
942			<-keepalive.C
943		}
944	}()
945	for {
946		select {
947		case <-maxIdle.C:
948			t.mu.Lock()
949			idle := t.idle
950			if idle.IsZero() { // The connection is non-idle.
951				t.mu.Unlock()
952				maxIdle.Reset(t.kp.MaxConnectionIdle)
953				continue
954			}
955			val := t.kp.MaxConnectionIdle - time.Since(idle)
956			t.mu.Unlock()
957			if val <= 0 {
958				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
959				// Gracefully close the connection.
960				t.drain(http2.ErrCodeNo, []byte{})
961				// Resetting the timer so that the clean-up doesn't deadlock.
962				maxIdle.Reset(infinity)
963				return
964			}
965			maxIdle.Reset(val)
966		case <-maxAge.C:
967			t.drain(http2.ErrCodeNo, []byte{})
968			maxAge.Reset(t.kp.MaxConnectionAgeGrace)
969			select {
970			case <-maxAge.C:
971				// Close the connection after grace period.
972				infof("transport: closing server transport due to maximum connection age.")
973				t.Close()
974				// Resetting the timer so that the clean-up doesn't deadlock.
975				maxAge.Reset(infinity)
976			case <-t.ctx.Done():
977			}
978			return
979		case <-keepalive.C:
980			if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
981				pingSent = false
982				keepalive.Reset(t.kp.Time)
983				continue
984			}
985			if pingSent {
986				infof("transport: closing server transport due to idleness.")
987				t.Close()
988				// Resetting the timer so that the clean-up doesn't deadlock.
989				keepalive.Reset(infinity)
990				return
991			}
992			pingSent = true
993			if channelz.IsOn() {
994				atomic.AddInt64(&t.czData.kpCount, 1)
995			}
996			t.controlBuf.put(p)
997			keepalive.Reset(t.kp.Timeout)
998		case <-t.ctx.Done():
999			return
1000		}
1001	}
1002}
1003
1004// Close starts shutting down the http2Server transport.
1005// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1006// could cause some resource issue. Revisit this later.
1007func (t *http2Server) Close() error {
1008	t.mu.Lock()
1009	if t.state == closing {
1010		t.mu.Unlock()
1011		return errors.New("transport: Close() was already called")
1012	}
1013	t.state = closing
1014	streams := t.activeStreams
1015	t.activeStreams = nil
1016	t.mu.Unlock()
1017	t.controlBuf.finish()
1018	t.cancel()
1019	err := t.conn.Close()
1020	if channelz.IsOn() {
1021		channelz.RemoveEntry(t.channelzID)
1022	}
1023	// Cancel all active streams.
1024	for _, s := range streams {
1025		s.cancel()
1026	}
1027	if t.stats != nil {
1028		connEnd := &stats.ConnEnd{}
1029		t.stats.HandleConn(t.ctx, connEnd)
1030	}
1031	return err
1032}
1033
1034// deleteStream deletes the stream s from transport's active streams.
1035func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1036	// In case stream sending and receiving are invoked in separate
1037	// goroutines (e.g., bi-directional streaming), cancel needs to be
1038	// called to interrupt the potential blocking on other goroutines.
1039	s.cancel()
1040
1041	t.mu.Lock()
1042	if _, ok := t.activeStreams[s.id]; ok {
1043		delete(t.activeStreams, s.id)
1044		if len(t.activeStreams) == 0 {
1045			t.idle = time.Now()
1046		}
1047	}
1048	t.mu.Unlock()
1049
1050	if channelz.IsOn() {
1051		if eosReceived {
1052			atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1053		} else {
1054			atomic.AddInt64(&t.czData.streamsFailed, 1)
1055		}
1056	}
1057}
1058
1059// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1060func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1061	oldState := s.swapState(streamDone)
1062	if oldState == streamDone {
1063		// If the stream was already done, return.
1064		return
1065	}
1066
1067	hdr.cleanup = &cleanupStream{
1068		streamID: s.id,
1069		rst:      rst,
1070		rstCode:  rstCode,
1071		onWrite: func() {
1072			t.deleteStream(s, eosReceived)
1073		},
1074	}
1075	t.controlBuf.put(hdr)
1076}
1077
1078// closeStream clears the footprint of a stream when the stream is not needed any more.
1079func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1080	s.swapState(streamDone)
1081	t.deleteStream(s, eosReceived)
1082
1083	t.controlBuf.put(&cleanupStream{
1084		streamID: s.id,
1085		rst:      rst,
1086		rstCode:  rstCode,
1087		onWrite:  func() {},
1088	})
1089}
1090
1091func (t *http2Server) RemoteAddr() net.Addr {
1092	return t.remoteAddr
1093}
1094
1095func (t *http2Server) Drain() {
1096	t.drain(http2.ErrCodeNo, []byte{})
1097}
1098
1099func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1100	t.mu.Lock()
1101	defer t.mu.Unlock()
1102	if t.drainChan != nil {
1103		return
1104	}
1105	t.drainChan = make(chan struct{})
1106	t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1107}
1108
1109var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1110
1111// Handles outgoing GoAway and returns true if loopy needs to put itself
1112// in draining mode.
1113func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1114	t.mu.Lock()
1115	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1116		t.mu.Unlock()
1117		// The transport is closing.
1118		return false, ErrConnClosing
1119	}
1120	sid := t.maxStreamID
1121	if !g.headsUp {
1122		// Stop accepting more streams now.
1123		t.state = draining
1124		if len(t.activeStreams) == 0 {
1125			g.closeConn = true
1126		}
1127		t.mu.Unlock()
1128		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1129			return false, err
1130		}
1131		if g.closeConn {
1132			// Abruptly close the connection following the GoAway (via
1133			// loopywriter).  But flush out what's inside the buffer first.
1134			t.framer.writer.Flush()
1135			return false, fmt.Errorf("transport: Connection closing")
1136		}
1137		return true, nil
1138	}
1139	t.mu.Unlock()
1140	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1141	// Follow that with a ping and wait for the ack to come back or a timer
1142	// to expire. During this time accept new streams since they might have
1143	// originated before the GoAway reaches the client.
1144	// After getting the ack or timer expiration send out another GoAway this
1145	// time with an ID of the max stream server intends to process.
1146	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1147		return false, err
1148	}
1149	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1150		return false, err
1151	}
1152	go func() {
1153		timer := time.NewTimer(time.Minute)
1154		defer timer.Stop()
1155		select {
1156		case <-t.drainChan:
1157		case <-timer.C:
1158		case <-t.ctx.Done():
1159			return
1160		}
1161		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1162	}()
1163	return false, nil
1164}
1165
1166func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1167	s := channelz.SocketInternalMetric{
1168		StreamsStarted:                   atomic.LoadInt64(&t.czData.streamsStarted),
1169		StreamsSucceeded:                 atomic.LoadInt64(&t.czData.streamsSucceeded),
1170		StreamsFailed:                    atomic.LoadInt64(&t.czData.streamsFailed),
1171		MessagesSent:                     atomic.LoadInt64(&t.czData.msgSent),
1172		MessagesReceived:                 atomic.LoadInt64(&t.czData.msgRecv),
1173		KeepAlivesSent:                   atomic.LoadInt64(&t.czData.kpCount),
1174		LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1175		LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1176		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1177		LocalFlowControlWindow:           int64(t.fc.getSize()),
1178		SocketOptions:                    channelz.GetSocketOption(t.conn),
1179		LocalAddr:                        t.localAddr,
1180		RemoteAddr:                       t.remoteAddr,
1181		// RemoteName :
1182	}
1183	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1184		s.Security = au.GetSecurityValue()
1185	}
1186	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1187	return &s
1188}
1189
1190func (t *http2Server) IncrMsgSent() {
1191	atomic.AddInt64(&t.czData.msgSent, 1)
1192	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1193}
1194
1195func (t *http2Server) IncrMsgRecv() {
1196	atomic.AddInt64(&t.czData.msgRecv, 1)
1197	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1198}
1199
1200func (t *http2Server) getOutFlowWindow() int64 {
1201	resp := make(chan uint32, 1)
1202	timer := time.NewTimer(time.Second)
1203	defer timer.Stop()
1204	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1205	select {
1206	case sz := <-resp:
1207		return int64(sz)
1208	case <-t.ctxDone:
1209		return -1
1210	case <-timer.C:
1211		return -2
1212	}
1213}
1214
1215func getJitter(v time.Duration) time.Duration {
1216	if v == infinity {
1217		return 0
1218	}
1219	// Generate a jitter between +/- 10% of the value.
1220	r := int64(v / 10)
1221	j := grpcrand.Int63n(2*r) - r
1222	return time.Duration(j)
1223}
1224