1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"context"
24	"errors"
25	"fmt"
26	"io"
27	"math"
28	"net"
29	"strconv"
30	"sync"
31	"sync/atomic"
32	"time"
33
34	"github.com/golang/protobuf/proto"
35	"golang.org/x/net/http2"
36	"golang.org/x/net/http2/hpack"
37	"google.golang.org/grpc/internal/grpcutil"
38
39	"google.golang.org/grpc/codes"
40	"google.golang.org/grpc/credentials"
41	"google.golang.org/grpc/internal/channelz"
42	"google.golang.org/grpc/internal/grpcrand"
43	"google.golang.org/grpc/keepalive"
44	"google.golang.org/grpc/metadata"
45	"google.golang.org/grpc/peer"
46	"google.golang.org/grpc/stats"
47	"google.golang.org/grpc/status"
48	"google.golang.org/grpc/tap"
49)
50
51var (
52	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
53	// the stream's state.
54	ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
55	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
56	// than the limit set by peer.
57	ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
58)
59
60// serverConnectionCounter counts the number of connections a server has seen
61// (equal to the number of http2Servers created). Must be accessed atomically.
62var serverConnectionCounter uint64
63
64// http2Server implements the ServerTransport interface with HTTP2.
65type http2Server struct {
66	lastRead    int64 // Keep this field 64-bit aligned. Accessed atomically.
67	ctx         context.Context
68	done        chan struct{}
69	conn        net.Conn
70	loopy       *loopyWriter
71	readerDone  chan struct{} // sync point to enable testing.
72	writerDone  chan struct{} // sync point to enable testing.
73	remoteAddr  net.Addr
74	localAddr   net.Addr
75	maxStreamID uint32               // max stream ID ever seen
76	authInfo    credentials.AuthInfo // auth info about the connection
77	inTapHandle tap.ServerInHandle
78	framer      *framer
79	// The max number of concurrent streams.
80	maxStreams uint32
81	// controlBuf delivers all the control related tasks (e.g., window
82	// updates, reset streams, and various settings) to the controller.
83	controlBuf *controlBuffer
84	fc         *trInFlow
85	stats      stats.Handler
86	// Keepalive and max-age parameters for the server.
87	kp keepalive.ServerParameters
88	// Keepalive enforcement policy.
89	kep keepalive.EnforcementPolicy
90	// The time instance last ping was received.
91	lastPingAt time.Time
92	// Number of times the client has violated keepalive ping policy so far.
93	pingStrikes uint8
94	// Flag to signify that number of ping strikes should be reset to 0.
95	// This is set whenever data or header frames are sent.
96	// 1 means yes.
97	resetPingStrikes      uint32 // Accessed atomically.
98	initialWindowSize     int32
99	bdpEst                *bdpEstimator
100	maxSendHeaderListSize *uint32
101
102	mu sync.Mutex // guard the following
103
104	// drainChan is initialized when drain(...) is called the first time.
105	// After which the server writes out the first GoAway(with ID 2^31-1) frame.
106	// Then an independent goroutine will be launched to later send the second GoAway.
107	// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
108	// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
109	// already underway.
110	drainChan     chan struct{}
111	state         transportState
112	activeStreams map[uint32]*Stream
113	// idle is the time instant when the connection went idle.
114	// This is either the beginning of the connection or when the number of
115	// RPCs go down to 0.
116	// When the connection is busy, this value is set to 0.
117	idle time.Time
118
119	// Fields below are for channelz metric collection.
120	channelzID int64 // channelz unique identification number
121	czData     *channelzData
122	bufferPool *bufferPool
123
124	connectionID uint64
125}
126
127// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
128// returned if something goes wrong.
129func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
130	writeBufSize := config.WriteBufferSize
131	readBufSize := config.ReadBufferSize
132	maxHeaderListSize := defaultServerMaxHeaderListSize
133	if config.MaxHeaderListSize != nil {
134		maxHeaderListSize = *config.MaxHeaderListSize
135	}
136	framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
137	// Send initial settings as connection preface to client.
138	isettings := []http2.Setting{{
139		ID:  http2.SettingMaxFrameSize,
140		Val: http2MaxFrameLen,
141	}}
142	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
143	// permitted in the HTTP2 spec.
144	maxStreams := config.MaxStreams
145	if maxStreams == 0 {
146		maxStreams = math.MaxUint32
147	} else {
148		isettings = append(isettings, http2.Setting{
149			ID:  http2.SettingMaxConcurrentStreams,
150			Val: maxStreams,
151		})
152	}
153	dynamicWindow := true
154	iwz := int32(initialWindowSize)
155	if config.InitialWindowSize >= defaultWindowSize {
156		iwz = config.InitialWindowSize
157		dynamicWindow = false
158	}
159	icwz := int32(initialWindowSize)
160	if config.InitialConnWindowSize >= defaultWindowSize {
161		icwz = config.InitialConnWindowSize
162		dynamicWindow = false
163	}
164	if iwz != defaultWindowSize {
165		isettings = append(isettings, http2.Setting{
166			ID:  http2.SettingInitialWindowSize,
167			Val: uint32(iwz)})
168	}
169	if config.MaxHeaderListSize != nil {
170		isettings = append(isettings, http2.Setting{
171			ID:  http2.SettingMaxHeaderListSize,
172			Val: *config.MaxHeaderListSize,
173		})
174	}
175	if config.HeaderTableSize != nil {
176		isettings = append(isettings, http2.Setting{
177			ID:  http2.SettingHeaderTableSize,
178			Val: *config.HeaderTableSize,
179		})
180	}
181	if err := framer.fr.WriteSettings(isettings...); err != nil {
182		return nil, connectionErrorf(false, err, "transport: %v", err)
183	}
184	// Adjust the connection flow control window if needed.
185	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
186		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
187			return nil, connectionErrorf(false, err, "transport: %v", err)
188		}
189	}
190	kp := config.KeepaliveParams
191	if kp.MaxConnectionIdle == 0 {
192		kp.MaxConnectionIdle = defaultMaxConnectionIdle
193	}
194	if kp.MaxConnectionAge == 0 {
195		kp.MaxConnectionAge = defaultMaxConnectionAge
196	}
197	// Add a jitter to MaxConnectionAge.
198	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
199	if kp.MaxConnectionAgeGrace == 0 {
200		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
201	}
202	if kp.Time == 0 {
203		kp.Time = defaultServerKeepaliveTime
204	}
205	if kp.Timeout == 0 {
206		kp.Timeout = defaultServerKeepaliveTimeout
207	}
208	kep := config.KeepalivePolicy
209	if kep.MinTime == 0 {
210		kep.MinTime = defaultKeepalivePolicyMinTime
211	}
212	done := make(chan struct{})
213	t := &http2Server{
214		ctx:               context.Background(),
215		done:              done,
216		conn:              conn,
217		remoteAddr:        conn.RemoteAddr(),
218		localAddr:         conn.LocalAddr(),
219		authInfo:          config.AuthInfo,
220		framer:            framer,
221		readerDone:        make(chan struct{}),
222		writerDone:        make(chan struct{}),
223		maxStreams:        maxStreams,
224		inTapHandle:       config.InTapHandle,
225		fc:                &trInFlow{limit: uint32(icwz)},
226		state:             reachable,
227		activeStreams:     make(map[uint32]*Stream),
228		stats:             config.StatsHandler,
229		kp:                kp,
230		idle:              time.Now(),
231		kep:               kep,
232		initialWindowSize: iwz,
233		czData:            new(channelzData),
234		bufferPool:        newBufferPool(),
235	}
236	t.controlBuf = newControlBuffer(t.done)
237	if dynamicWindow {
238		t.bdpEst = &bdpEstimator{
239			bdp:               initialWindowSize,
240			updateFlowControl: t.updateFlowControl,
241		}
242	}
243	if t.stats != nil {
244		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
245			RemoteAddr: t.remoteAddr,
246			LocalAddr:  t.localAddr,
247		})
248		connBegin := &stats.ConnBegin{}
249		t.stats.HandleConn(t.ctx, connBegin)
250	}
251	if channelz.IsOn() {
252		t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
253	}
254
255	t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
256
257	t.framer.writer.Flush()
258
259	defer func() {
260		if err != nil {
261			t.Close()
262		}
263	}()
264
265	// Check the validity of client preface.
266	preface := make([]byte, len(clientPreface))
267	if _, err := io.ReadFull(t.conn, preface); err != nil {
268		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
269	}
270	if !bytes.Equal(preface, clientPreface) {
271		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
272	}
273
274	frame, err := t.framer.fr.ReadFrame()
275	if err == io.EOF || err == io.ErrUnexpectedEOF {
276		return nil, err
277	}
278	if err != nil {
279		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
280	}
281	atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
282	sf, ok := frame.(*http2.SettingsFrame)
283	if !ok {
284		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
285	}
286	t.handleSettings(sf)
287
288	go func() {
289		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
290		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
291		if err := t.loopy.run(); err != nil {
292			if logger.V(logLevel) {
293				logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
294			}
295		}
296		t.conn.Close()
297		close(t.writerDone)
298	}()
299	go t.keepalive()
300	return t, nil
301}
302
303// operateHeader takes action on the decoded headers.
304func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
305	streamID := frame.Header().StreamID
306	state := &decodeState{
307		serverSide: true,
308	}
309	if h2code, err := state.decodeHeader(frame); err != nil {
310		if _, ok := status.FromError(err); ok {
311			t.controlBuf.put(&cleanupStream{
312				streamID: streamID,
313				rst:      true,
314				rstCode:  h2code,
315				onWrite:  func() {},
316			})
317		}
318		return false
319	}
320
321	buf := newRecvBuffer()
322	s := &Stream{
323		id:             streamID,
324		st:             t,
325		buf:            buf,
326		fc:             &inFlow{limit: uint32(t.initialWindowSize)},
327		recvCompress:   state.data.encoding,
328		method:         state.data.method,
329		contentSubtype: state.data.contentSubtype,
330	}
331	if frame.StreamEnded() {
332		// s is just created by the caller. No lock needed.
333		s.state = streamReadDone
334	}
335	if state.data.timeoutSet {
336		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
337	} else {
338		s.ctx, s.cancel = context.WithCancel(t.ctx)
339	}
340	pr := &peer.Peer{
341		Addr: t.remoteAddr,
342	}
343	// Attach Auth info if there is any.
344	if t.authInfo != nil {
345		pr.AuthInfo = t.authInfo
346	}
347	s.ctx = peer.NewContext(s.ctx, pr)
348	// Attach the received metadata to the context.
349	if len(state.data.mdata) > 0 {
350		s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
351	}
352	if state.data.statsTags != nil {
353		s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
354	}
355	if state.data.statsTrace != nil {
356		s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
357	}
358	if t.inTapHandle != nil {
359		var err error
360		info := &tap.Info{
361			FullMethodName: state.data.method,
362		}
363		s.ctx, err = t.inTapHandle(s.ctx, info)
364		if err != nil {
365			if logger.V(logLevel) {
366				logger.Warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
367			}
368			t.controlBuf.put(&cleanupStream{
369				streamID: s.id,
370				rst:      true,
371				rstCode:  http2.ErrCodeRefusedStream,
372				onWrite:  func() {},
373			})
374			s.cancel()
375			return false
376		}
377	}
378	t.mu.Lock()
379	if t.state != reachable {
380		t.mu.Unlock()
381		s.cancel()
382		return false
383	}
384	if uint32(len(t.activeStreams)) >= t.maxStreams {
385		t.mu.Unlock()
386		t.controlBuf.put(&cleanupStream{
387			streamID: streamID,
388			rst:      true,
389			rstCode:  http2.ErrCodeRefusedStream,
390			onWrite:  func() {},
391		})
392		s.cancel()
393		return false
394	}
395	if streamID%2 != 1 || streamID <= t.maxStreamID {
396		t.mu.Unlock()
397		// illegal gRPC stream id.
398		if logger.V(logLevel) {
399			logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
400		}
401		s.cancel()
402		return true
403	}
404	t.maxStreamID = streamID
405	t.activeStreams[streamID] = s
406	if len(t.activeStreams) == 1 {
407		t.idle = time.Time{}
408	}
409	t.mu.Unlock()
410	if channelz.IsOn() {
411		atomic.AddInt64(&t.czData.streamsStarted, 1)
412		atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
413	}
414	s.requestRead = func(n int) {
415		t.adjustWindow(s, uint32(n))
416	}
417	s.ctx = traceCtx(s.ctx, s.method)
418	if t.stats != nil {
419		s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
420		inHeader := &stats.InHeader{
421			FullMethod:  s.method,
422			RemoteAddr:  t.remoteAddr,
423			LocalAddr:   t.localAddr,
424			Compression: s.recvCompress,
425			WireLength:  int(frame.Header().Length),
426			Header:      metadata.MD(state.data.mdata).Copy(),
427		}
428		t.stats.HandleRPC(s.ctx, inHeader)
429	}
430	s.ctxDone = s.ctx.Done()
431	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
432	s.trReader = &transportReader{
433		reader: &recvBufferReader{
434			ctx:        s.ctx,
435			ctxDone:    s.ctxDone,
436			recv:       s.buf,
437			freeBuffer: t.bufferPool.put,
438		},
439		windowHandler: func(n int) {
440			t.updateWindow(s, uint32(n))
441		},
442	}
443	// Register the stream with loopy.
444	t.controlBuf.put(&registerStream{
445		streamID: s.id,
446		wq:       s.wq,
447	})
448	handle(s)
449	return false
450}
451
452// HandleStreams receives incoming streams using the given handler. This is
453// typically run in a separate goroutine.
454// traceCtx attaches trace to ctx and returns the new context.
455func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
456	defer close(t.readerDone)
457	for {
458		t.controlBuf.throttle()
459		frame, err := t.framer.fr.ReadFrame()
460		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
461		if err != nil {
462			if se, ok := err.(http2.StreamError); ok {
463				if logger.V(logLevel) {
464					logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
465				}
466				t.mu.Lock()
467				s := t.activeStreams[se.StreamID]
468				t.mu.Unlock()
469				if s != nil {
470					t.closeStream(s, true, se.Code, false)
471				} else {
472					t.controlBuf.put(&cleanupStream{
473						streamID: se.StreamID,
474						rst:      true,
475						rstCode:  se.Code,
476						onWrite:  func() {},
477					})
478				}
479				continue
480			}
481			if err == io.EOF || err == io.ErrUnexpectedEOF {
482				t.Close()
483				return
484			}
485			if logger.V(logLevel) {
486				logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
487			}
488			t.Close()
489			return
490		}
491		switch frame := frame.(type) {
492		case *http2.MetaHeadersFrame:
493			if t.operateHeaders(frame, handle, traceCtx) {
494				t.Close()
495				break
496			}
497		case *http2.DataFrame:
498			t.handleData(frame)
499		case *http2.RSTStreamFrame:
500			t.handleRSTStream(frame)
501		case *http2.SettingsFrame:
502			t.handleSettings(frame)
503		case *http2.PingFrame:
504			t.handlePing(frame)
505		case *http2.WindowUpdateFrame:
506			t.handleWindowUpdate(frame)
507		case *http2.GoAwayFrame:
508			// TODO: Handle GoAway from the client appropriately.
509		default:
510			if logger.V(logLevel) {
511				logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
512			}
513		}
514	}
515}
516
517func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
518	t.mu.Lock()
519	defer t.mu.Unlock()
520	if t.activeStreams == nil {
521		// The transport is closing.
522		return nil, false
523	}
524	s, ok := t.activeStreams[f.Header().StreamID]
525	if !ok {
526		// The stream is already done.
527		return nil, false
528	}
529	return s, true
530}
531
532// adjustWindow sends out extra window update over the initial window size
533// of stream if the application is requesting data larger in size than
534// the window.
535func (t *http2Server) adjustWindow(s *Stream, n uint32) {
536	if w := s.fc.maybeAdjust(n); w > 0 {
537		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
538	}
539
540}
541
542// updateWindow adjusts the inbound quota for the stream and the transport.
543// Window updates will deliver to the controller for sending when
544// the cumulative quota exceeds the corresponding threshold.
545func (t *http2Server) updateWindow(s *Stream, n uint32) {
546	if w := s.fc.onRead(n); w > 0 {
547		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
548			increment: w,
549		})
550	}
551}
552
553// updateFlowControl updates the incoming flow control windows
554// for the transport and the stream based on the current bdp
555// estimation.
556func (t *http2Server) updateFlowControl(n uint32) {
557	t.mu.Lock()
558	for _, s := range t.activeStreams {
559		s.fc.newLimit(n)
560	}
561	t.initialWindowSize = int32(n)
562	t.mu.Unlock()
563	t.controlBuf.put(&outgoingWindowUpdate{
564		streamID:  0,
565		increment: t.fc.newLimit(n),
566	})
567	t.controlBuf.put(&outgoingSettings{
568		ss: []http2.Setting{
569			{
570				ID:  http2.SettingInitialWindowSize,
571				Val: n,
572			},
573		},
574	})
575
576}
577
578func (t *http2Server) handleData(f *http2.DataFrame) {
579	size := f.Header().Length
580	var sendBDPPing bool
581	if t.bdpEst != nil {
582		sendBDPPing = t.bdpEst.add(size)
583	}
584	// Decouple connection's flow control from application's read.
585	// An update on connection's flow control should not depend on
586	// whether user application has read the data or not. Such a
587	// restriction is already imposed on the stream's flow control,
588	// and therefore the sender will be blocked anyways.
589	// Decoupling the connection flow control will prevent other
590	// active(fast) streams from starving in presence of slow or
591	// inactive streams.
592	if w := t.fc.onData(size); w > 0 {
593		t.controlBuf.put(&outgoingWindowUpdate{
594			streamID:  0,
595			increment: w,
596		})
597	}
598	if sendBDPPing {
599		// Avoid excessive ping detection (e.g. in an L7 proxy)
600		// by sending a window update prior to the BDP ping.
601		if w := t.fc.reset(); w > 0 {
602			t.controlBuf.put(&outgoingWindowUpdate{
603				streamID:  0,
604				increment: w,
605			})
606		}
607		t.controlBuf.put(bdpPing)
608	}
609	// Select the right stream to dispatch.
610	s, ok := t.getStream(f)
611	if !ok {
612		return
613	}
614	if s.getState() == streamReadDone {
615		t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
616		return
617	}
618	if size > 0 {
619		if err := s.fc.onData(size); err != nil {
620			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
621			return
622		}
623		if f.Header().Flags.Has(http2.FlagDataPadded) {
624			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
625				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
626			}
627		}
628		// TODO(bradfitz, zhaoq): A copy is required here because there is no
629		// guarantee f.Data() is consumed before the arrival of next frame.
630		// Can this copy be eliminated?
631		if len(f.Data()) > 0 {
632			buffer := t.bufferPool.get()
633			buffer.Reset()
634			buffer.Write(f.Data())
635			s.write(recvMsg{buffer: buffer})
636		}
637	}
638	if f.Header().Flags.Has(http2.FlagDataEndStream) {
639		// Received the end of stream from the client.
640		s.compareAndSwapState(streamActive, streamReadDone)
641		s.write(recvMsg{err: io.EOF})
642	}
643}
644
645func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
646	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
647	if s, ok := t.getStream(f); ok {
648		t.closeStream(s, false, 0, false)
649		return
650	}
651	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
652	t.controlBuf.put(&cleanupStream{
653		streamID: f.Header().StreamID,
654		rst:      false,
655		rstCode:  0,
656		onWrite:  func() {},
657	})
658}
659
660func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
661	if f.IsAck() {
662		return
663	}
664	var ss []http2.Setting
665	var updateFuncs []func()
666	f.ForeachSetting(func(s http2.Setting) error {
667		switch s.ID {
668		case http2.SettingMaxHeaderListSize:
669			updateFuncs = append(updateFuncs, func() {
670				t.maxSendHeaderListSize = new(uint32)
671				*t.maxSendHeaderListSize = s.Val
672			})
673		default:
674			ss = append(ss, s)
675		}
676		return nil
677	})
678	t.controlBuf.executeAndPut(func(interface{}) bool {
679		for _, f := range updateFuncs {
680			f()
681		}
682		return true
683	}, &incomingSettings{
684		ss: ss,
685	})
686}
687
688const (
689	maxPingStrikes     = 2
690	defaultPingTimeout = 2 * time.Hour
691)
692
693func (t *http2Server) handlePing(f *http2.PingFrame) {
694	if f.IsAck() {
695		if f.Data == goAwayPing.data && t.drainChan != nil {
696			close(t.drainChan)
697			return
698		}
699		// Maybe it's a BDP ping.
700		if t.bdpEst != nil {
701			t.bdpEst.calculate(f.Data)
702		}
703		return
704	}
705	pingAck := &ping{ack: true}
706	copy(pingAck.data[:], f.Data[:])
707	t.controlBuf.put(pingAck)
708
709	now := time.Now()
710	defer func() {
711		t.lastPingAt = now
712	}()
713	// A reset ping strikes means that we don't need to check for policy
714	// violation for this ping and the pingStrikes counter should be set
715	// to 0.
716	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
717		t.pingStrikes = 0
718		return
719	}
720	t.mu.Lock()
721	ns := len(t.activeStreams)
722	t.mu.Unlock()
723	if ns < 1 && !t.kep.PermitWithoutStream {
724		// Keepalive shouldn't be active thus, this new ping should
725		// have come after at least defaultPingTimeout.
726		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
727			t.pingStrikes++
728		}
729	} else {
730		// Check if keepalive policy is respected.
731		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
732			t.pingStrikes++
733		}
734	}
735
736	if t.pingStrikes > maxPingStrikes {
737		// Send goaway and close the connection.
738		if logger.V(logLevel) {
739			logger.Errorf("transport: Got too many pings from the client, closing the connection.")
740		}
741		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
742	}
743}
744
745func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
746	t.controlBuf.put(&incomingWindowUpdate{
747		streamID:  f.Header().StreamID,
748		increment: f.Increment,
749	})
750}
751
752func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
753	for k, vv := range md {
754		if isReservedHeader(k) {
755			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
756			continue
757		}
758		for _, v := range vv {
759			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
760		}
761	}
762	return headerFields
763}
764
765func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
766	if t.maxSendHeaderListSize == nil {
767		return true
768	}
769	hdrFrame := it.(*headerFrame)
770	var sz int64
771	for _, f := range hdrFrame.hf {
772		if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
773			if logger.V(logLevel) {
774				logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
775			}
776			return false
777		}
778	}
779	return true
780}
781
782// WriteHeader sends the header metadata md back to the client.
783func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
784	if s.updateHeaderSent() || s.getState() == streamDone {
785		return ErrIllegalHeaderWrite
786	}
787	s.hdrMu.Lock()
788	if md.Len() > 0 {
789		if s.header.Len() > 0 {
790			s.header = metadata.Join(s.header, md)
791		} else {
792			s.header = md
793		}
794	}
795	if err := t.writeHeaderLocked(s); err != nil {
796		s.hdrMu.Unlock()
797		return err
798	}
799	s.hdrMu.Unlock()
800	return nil
801}
802
803func (t *http2Server) setResetPingStrikes() {
804	atomic.StoreUint32(&t.resetPingStrikes, 1)
805}
806
807func (t *http2Server) writeHeaderLocked(s *Stream) error {
808	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
809	// first and create a slice of that exact size.
810	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
811	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
812	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
813	if s.sendCompress != "" {
814		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
815	}
816	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
817	success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
818		streamID:  s.id,
819		hf:        headerFields,
820		endStream: false,
821		onWrite:   t.setResetPingStrikes,
822	})
823	if !success {
824		if err != nil {
825			return err
826		}
827		t.closeStream(s, true, http2.ErrCodeInternal, false)
828		return ErrHeaderListSizeLimitViolation
829	}
830	if t.stats != nil {
831		// Note: Headers are compressed with hpack after this call returns.
832		// No WireLength field is set here.
833		outHeader := &stats.OutHeader{
834			Header:      s.header.Copy(),
835			Compression: s.sendCompress,
836		}
837		t.stats.HandleRPC(s.Context(), outHeader)
838	}
839	return nil
840}
841
842// WriteStatus sends stream status to the client and terminates the stream.
843// There is no further I/O operations being able to perform on this stream.
844// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
845// OK is adopted.
846func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
847	if s.getState() == streamDone {
848		return nil
849	}
850	s.hdrMu.Lock()
851	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
852	// first and create a slice of that exact size.
853	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
854	if !s.updateHeaderSent() {                      // No headers have been sent.
855		if len(s.header) > 0 { // Send a separate header frame.
856			if err := t.writeHeaderLocked(s); err != nil {
857				s.hdrMu.Unlock()
858				return err
859			}
860		} else { // Send a trailer only response.
861			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
862			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
863		}
864	}
865	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
866	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
867
868	if p := st.Proto(); p != nil && len(p.Details) > 0 {
869		stBytes, err := proto.Marshal(p)
870		if err != nil {
871			// TODO: return error instead, when callers are able to handle it.
872			logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
873		} else {
874			headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
875		}
876	}
877
878	// Attach the trailer metadata.
879	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
880	trailingHeader := &headerFrame{
881		streamID:  s.id,
882		hf:        headerFields,
883		endStream: true,
884		onWrite:   t.setResetPingStrikes,
885	}
886	s.hdrMu.Unlock()
887	success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
888	if !success {
889		if err != nil {
890			return err
891		}
892		t.closeStream(s, true, http2.ErrCodeInternal, false)
893		return ErrHeaderListSizeLimitViolation
894	}
895	// Send a RST_STREAM after the trailers if the client has not already half-closed.
896	rst := s.getState() == streamActive
897	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
898	if t.stats != nil {
899		// Note: The trailer fields are compressed with hpack after this call returns.
900		// No WireLength field is set here.
901		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
902			Trailer: s.trailer.Copy(),
903		})
904	}
905	return nil
906}
907
908// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
909// is returns if it fails (e.g., framing error, transport error).
910func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
911	if !s.isHeaderSent() { // Headers haven't been written yet.
912		if err := t.WriteHeader(s, nil); err != nil {
913			if _, ok := err.(ConnectionError); ok {
914				return err
915			}
916			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
917			return status.Errorf(codes.Internal, "transport: %v", err)
918		}
919	} else {
920		// Writing headers checks for this condition.
921		if s.getState() == streamDone {
922			// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
923			s.cancel()
924			select {
925			case <-t.done:
926				return ErrConnClosing
927			default:
928			}
929			return ContextErr(s.ctx.Err())
930		}
931	}
932	df := &dataFrame{
933		streamID:    s.id,
934		h:           hdr,
935		d:           data,
936		onEachWrite: t.setResetPingStrikes,
937	}
938	if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
939		select {
940		case <-t.done:
941			return ErrConnClosing
942		default:
943		}
944		return ContextErr(s.ctx.Err())
945	}
946	return t.controlBuf.put(df)
947}
948
949// keepalive running in a separate goroutine does the following:
950// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
951// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
952// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
953// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
954// after an additional duration of keepalive.Timeout.
955func (t *http2Server) keepalive() {
956	p := &ping{}
957	// True iff a ping has been sent, and no data has been received since then.
958	outstandingPing := false
959	// Amount of time remaining before which we should receive an ACK for the
960	// last sent ping.
961	kpTimeoutLeft := time.Duration(0)
962	// Records the last value of t.lastRead before we go block on the timer.
963	// This is required to check for read activity since then.
964	prevNano := time.Now().UnixNano()
965	// Initialize the different timers to their default values.
966	idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
967	ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
968	kpTimer := time.NewTimer(t.kp.Time)
969	defer func() {
970		// We need to drain the underlying channel in these timers after a call
971		// to Stop(), only if we are interested in resetting them. Clearly we
972		// are not interested in resetting them here.
973		idleTimer.Stop()
974		ageTimer.Stop()
975		kpTimer.Stop()
976	}()
977
978	for {
979		select {
980		case <-idleTimer.C:
981			t.mu.Lock()
982			idle := t.idle
983			if idle.IsZero() { // The connection is non-idle.
984				t.mu.Unlock()
985				idleTimer.Reset(t.kp.MaxConnectionIdle)
986				continue
987			}
988			val := t.kp.MaxConnectionIdle - time.Since(idle)
989			t.mu.Unlock()
990			if val <= 0 {
991				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
992				// Gracefully close the connection.
993				t.drain(http2.ErrCodeNo, []byte{})
994				return
995			}
996			idleTimer.Reset(val)
997		case <-ageTimer.C:
998			t.drain(http2.ErrCodeNo, []byte{})
999			ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
1000			select {
1001			case <-ageTimer.C:
1002				// Close the connection after grace period.
1003				if logger.V(logLevel) {
1004					logger.Infof("transport: closing server transport due to maximum connection age.")
1005				}
1006				t.Close()
1007			case <-t.done:
1008			}
1009			return
1010		case <-kpTimer.C:
1011			lastRead := atomic.LoadInt64(&t.lastRead)
1012			if lastRead > prevNano {
1013				// There has been read activity since the last time we were
1014				// here. Setup the timer to fire at kp.Time seconds from
1015				// lastRead time and continue.
1016				outstandingPing = false
1017				kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1018				prevNano = lastRead
1019				continue
1020			}
1021			if outstandingPing && kpTimeoutLeft <= 0 {
1022				if logger.V(logLevel) {
1023					logger.Infof("transport: closing server transport due to idleness.")
1024				}
1025				t.Close()
1026				return
1027			}
1028			if !outstandingPing {
1029				if channelz.IsOn() {
1030					atomic.AddInt64(&t.czData.kpCount, 1)
1031				}
1032				t.controlBuf.put(p)
1033				kpTimeoutLeft = t.kp.Timeout
1034				outstandingPing = true
1035			}
1036			// The amount of time to sleep here is the minimum of kp.Time and
1037			// timeoutLeft. This will ensure that we wait only for kp.Time
1038			// before sending out the next ping (for cases where the ping is
1039			// acked).
1040			sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
1041			kpTimeoutLeft -= sleepDuration
1042			kpTimer.Reset(sleepDuration)
1043		case <-t.done:
1044			return
1045		}
1046	}
1047}
1048
1049// Close starts shutting down the http2Server transport.
1050// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1051// could cause some resource issue. Revisit this later.
1052func (t *http2Server) Close() error {
1053	t.mu.Lock()
1054	if t.state == closing {
1055		t.mu.Unlock()
1056		return errors.New("transport: Close() was already called")
1057	}
1058	t.state = closing
1059	streams := t.activeStreams
1060	t.activeStreams = nil
1061	t.mu.Unlock()
1062	t.controlBuf.finish()
1063	close(t.done)
1064	err := t.conn.Close()
1065	if channelz.IsOn() {
1066		channelz.RemoveEntry(t.channelzID)
1067	}
1068	// Cancel all active streams.
1069	for _, s := range streams {
1070		s.cancel()
1071	}
1072	if t.stats != nil {
1073		connEnd := &stats.ConnEnd{}
1074		t.stats.HandleConn(t.ctx, connEnd)
1075	}
1076	return err
1077}
1078
1079// deleteStream deletes the stream s from transport's active streams.
1080func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1081	// In case stream sending and receiving are invoked in separate
1082	// goroutines (e.g., bi-directional streaming), cancel needs to be
1083	// called to interrupt the potential blocking on other goroutines.
1084	s.cancel()
1085
1086	t.mu.Lock()
1087	if _, ok := t.activeStreams[s.id]; ok {
1088		delete(t.activeStreams, s.id)
1089		if len(t.activeStreams) == 0 {
1090			t.idle = time.Now()
1091		}
1092	}
1093	t.mu.Unlock()
1094
1095	if channelz.IsOn() {
1096		if eosReceived {
1097			atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1098		} else {
1099			atomic.AddInt64(&t.czData.streamsFailed, 1)
1100		}
1101	}
1102}
1103
1104// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1105func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1106	oldState := s.swapState(streamDone)
1107	if oldState == streamDone {
1108		// If the stream was already done, return.
1109		return
1110	}
1111
1112	hdr.cleanup = &cleanupStream{
1113		streamID: s.id,
1114		rst:      rst,
1115		rstCode:  rstCode,
1116		onWrite: func() {
1117			t.deleteStream(s, eosReceived)
1118		},
1119	}
1120	t.controlBuf.put(hdr)
1121}
1122
1123// closeStream clears the footprint of a stream when the stream is not needed any more.
1124func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1125	s.swapState(streamDone)
1126	t.deleteStream(s, eosReceived)
1127
1128	t.controlBuf.put(&cleanupStream{
1129		streamID: s.id,
1130		rst:      rst,
1131		rstCode:  rstCode,
1132		onWrite:  func() {},
1133	})
1134}
1135
1136func (t *http2Server) RemoteAddr() net.Addr {
1137	return t.remoteAddr
1138}
1139
1140func (t *http2Server) Drain() {
1141	t.drain(http2.ErrCodeNo, []byte{})
1142}
1143
1144func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1145	t.mu.Lock()
1146	defer t.mu.Unlock()
1147	if t.drainChan != nil {
1148		return
1149	}
1150	t.drainChan = make(chan struct{})
1151	t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1152}
1153
1154var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1155
1156// Handles outgoing GoAway and returns true if loopy needs to put itself
1157// in draining mode.
1158func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1159	t.mu.Lock()
1160	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1161		t.mu.Unlock()
1162		// The transport is closing.
1163		return false, ErrConnClosing
1164	}
1165	sid := t.maxStreamID
1166	if !g.headsUp {
1167		// Stop accepting more streams now.
1168		t.state = draining
1169		if len(t.activeStreams) == 0 {
1170			g.closeConn = true
1171		}
1172		t.mu.Unlock()
1173		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1174			return false, err
1175		}
1176		if g.closeConn {
1177			// Abruptly close the connection following the GoAway (via
1178			// loopywriter).  But flush out what's inside the buffer first.
1179			t.framer.writer.Flush()
1180			return false, fmt.Errorf("transport: Connection closing")
1181		}
1182		return true, nil
1183	}
1184	t.mu.Unlock()
1185	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1186	// Follow that with a ping and wait for the ack to come back or a timer
1187	// to expire. During this time accept new streams since they might have
1188	// originated before the GoAway reaches the client.
1189	// After getting the ack or timer expiration send out another GoAway this
1190	// time with an ID of the max stream server intends to process.
1191	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1192		return false, err
1193	}
1194	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1195		return false, err
1196	}
1197	go func() {
1198		timer := time.NewTimer(time.Minute)
1199		defer timer.Stop()
1200		select {
1201		case <-t.drainChan:
1202		case <-timer.C:
1203		case <-t.done:
1204			return
1205		}
1206		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1207	}()
1208	return false, nil
1209}
1210
1211func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1212	s := channelz.SocketInternalMetric{
1213		StreamsStarted:                   atomic.LoadInt64(&t.czData.streamsStarted),
1214		StreamsSucceeded:                 atomic.LoadInt64(&t.czData.streamsSucceeded),
1215		StreamsFailed:                    atomic.LoadInt64(&t.czData.streamsFailed),
1216		MessagesSent:                     atomic.LoadInt64(&t.czData.msgSent),
1217		MessagesReceived:                 atomic.LoadInt64(&t.czData.msgRecv),
1218		KeepAlivesSent:                   atomic.LoadInt64(&t.czData.kpCount),
1219		LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1220		LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1221		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1222		LocalFlowControlWindow:           int64(t.fc.getSize()),
1223		SocketOptions:                    channelz.GetSocketOption(t.conn),
1224		LocalAddr:                        t.localAddr,
1225		RemoteAddr:                       t.remoteAddr,
1226		// RemoteName :
1227	}
1228	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1229		s.Security = au.GetSecurityValue()
1230	}
1231	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1232	return &s
1233}
1234
1235func (t *http2Server) IncrMsgSent() {
1236	atomic.AddInt64(&t.czData.msgSent, 1)
1237	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1238}
1239
1240func (t *http2Server) IncrMsgRecv() {
1241	atomic.AddInt64(&t.czData.msgRecv, 1)
1242	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1243}
1244
1245func (t *http2Server) getOutFlowWindow() int64 {
1246	resp := make(chan uint32, 1)
1247	timer := time.NewTimer(time.Second)
1248	defer timer.Stop()
1249	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1250	select {
1251	case sz := <-resp:
1252		return int64(sz)
1253	case <-t.done:
1254		return -1
1255	case <-timer.C:
1256		return -2
1257	}
1258}
1259
1260func getJitter(v time.Duration) time.Duration {
1261	if v == infinity {
1262		return 0
1263	}
1264	// Generate a jitter between +/- 10% of the value.
1265	r := int64(v / 10)
1266	j := grpcrand.Int63n(2*r) - r
1267	return time.Duration(j)
1268}
1269