1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"errors"
24	"fmt"
25	"io"
26	"math"
27	"math/rand"
28	"net"
29	"strconv"
30	"sync"
31	"sync/atomic"
32	"time"
33
34	"github.com/golang/protobuf/proto"
35	"golang.org/x/net/context"
36	"golang.org/x/net/http2"
37	"golang.org/x/net/http2/hpack"
38
39	"google.golang.org/grpc/channelz"
40	"google.golang.org/grpc/codes"
41	"google.golang.org/grpc/credentials"
42	"google.golang.org/grpc/keepalive"
43	"google.golang.org/grpc/metadata"
44	"google.golang.org/grpc/peer"
45	"google.golang.org/grpc/stats"
46	"google.golang.org/grpc/status"
47	"google.golang.org/grpc/tap"
48)
49
50// ErrIllegalHeaderWrite indicates that setting header is illegal because of
51// the stream's state.
52var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
53
54// http2Server implements the ServerTransport interface with HTTP2.
55type http2Server struct {
56	ctx         context.Context
57	ctxDone     <-chan struct{} // Cache the context.Done() chan
58	cancel      context.CancelFunc
59	conn        net.Conn
60	loopy       *loopyWriter
61	readerDone  chan struct{} // sync point to enable testing.
62	writerDone  chan struct{} // sync point to enable testing.
63	remoteAddr  net.Addr
64	localAddr   net.Addr
65	maxStreamID uint32               // max stream ID ever seen
66	authInfo    credentials.AuthInfo // auth info about the connection
67	inTapHandle tap.ServerInHandle
68	framer      *framer
69	// The max number of concurrent streams.
70	maxStreams uint32
71	// controlBuf delivers all the control related tasks (e.g., window
72	// updates, reset streams, and various settings) to the controller.
73	controlBuf *controlBuffer
74	fc         *trInFlow
75	stats      stats.Handler
76	// Flag to keep track of reading activity on transport.
77	// 1 is true and 0 is false.
78	activity uint32 // Accessed atomically.
79	// Keepalive and max-age parameters for the server.
80	kp keepalive.ServerParameters
81
82	// Keepalive enforcement policy.
83	kep keepalive.EnforcementPolicy
84	// The time instance last ping was received.
85	lastPingAt time.Time
86	// Number of times the client has violated keepalive ping policy so far.
87	pingStrikes uint8
88	// Flag to signify that number of ping strikes should be reset to 0.
89	// This is set whenever data or header frames are sent.
90	// 1 means yes.
91	resetPingStrikes  uint32 // Accessed atomically.
92	initialWindowSize int32
93	bdpEst            *bdpEstimator
94
95	mu sync.Mutex // guard the following
96
97	// drainChan is initialized when drain(...) is called the first time.
98	// After which the server writes out the first GoAway(with ID 2^31-1) frame.
99	// Then an independent goroutine will be launched to later send the second GoAway.
100	// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
101	// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
102	// already underway.
103	drainChan     chan struct{}
104	state         transportState
105	activeStreams map[uint32]*Stream
106	// idle is the time instant when the connection went idle.
107	// This is either the beginning of the connection or when the number of
108	// RPCs go down to 0.
109	// When the connection is busy, this value is set to 0.
110	idle time.Time
111
112	// Fields below are for channelz metric collection.
113	channelzID int64 // channelz unique identification number
114	czmu       sync.RWMutex
115	kpCount    int64
116	// The number of streams that have started, including already finished ones.
117	streamsStarted int64
118	// The number of streams that have ended successfully by sending frame with
119	// EoS bit set.
120	streamsSucceeded  int64
121	streamsFailed     int64
122	lastStreamCreated time.Time
123	msgSent           int64
124	msgRecv           int64
125	lastMsgSent       time.Time
126	lastMsgRecv       time.Time
127}
128
129// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
130// returned if something goes wrong.
131func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
132	writeBufSize := defaultWriteBufSize
133	if config.WriteBufferSize > 0 {
134		writeBufSize = config.WriteBufferSize
135	}
136	readBufSize := defaultReadBufSize
137	if config.ReadBufferSize > 0 {
138		readBufSize = config.ReadBufferSize
139	}
140	framer := newFramer(conn, writeBufSize, readBufSize)
141	// Send initial settings as connection preface to client.
142	var isettings []http2.Setting
143	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
144	// permitted in the HTTP2 spec.
145	maxStreams := config.MaxStreams
146	if maxStreams == 0 {
147		maxStreams = math.MaxUint32
148	} else {
149		isettings = append(isettings, http2.Setting{
150			ID:  http2.SettingMaxConcurrentStreams,
151			Val: maxStreams,
152		})
153	}
154	dynamicWindow := true
155	iwz := int32(initialWindowSize)
156	if config.InitialWindowSize >= defaultWindowSize {
157		iwz = config.InitialWindowSize
158		dynamicWindow = false
159	}
160	icwz := int32(initialWindowSize)
161	if config.InitialConnWindowSize >= defaultWindowSize {
162		icwz = config.InitialConnWindowSize
163		dynamicWindow = false
164	}
165	if iwz != defaultWindowSize {
166		isettings = append(isettings, http2.Setting{
167			ID:  http2.SettingInitialWindowSize,
168			Val: uint32(iwz)})
169	}
170	if err := framer.fr.WriteSettings(isettings...); err != nil {
171		return nil, connectionErrorf(false, err, "transport: %v", err)
172	}
173	// Adjust the connection flow control window if needed.
174	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
175		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
176			return nil, connectionErrorf(false, err, "transport: %v", err)
177		}
178	}
179	kp := config.KeepaliveParams
180	if kp.MaxConnectionIdle == 0 {
181		kp.MaxConnectionIdle = defaultMaxConnectionIdle
182	}
183	if kp.MaxConnectionAge == 0 {
184		kp.MaxConnectionAge = defaultMaxConnectionAge
185	}
186	// Add a jitter to MaxConnectionAge.
187	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
188	if kp.MaxConnectionAgeGrace == 0 {
189		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
190	}
191	if kp.Time == 0 {
192		kp.Time = defaultServerKeepaliveTime
193	}
194	if kp.Timeout == 0 {
195		kp.Timeout = defaultServerKeepaliveTimeout
196	}
197	kep := config.KeepalivePolicy
198	if kep.MinTime == 0 {
199		kep.MinTime = defaultKeepalivePolicyMinTime
200	}
201	ctx, cancel := context.WithCancel(context.Background())
202	t := &http2Server{
203		ctx:               ctx,
204		cancel:            cancel,
205		ctxDone:           ctx.Done(),
206		conn:              conn,
207		remoteAddr:        conn.RemoteAddr(),
208		localAddr:         conn.LocalAddr(),
209		authInfo:          config.AuthInfo,
210		framer:            framer,
211		readerDone:        make(chan struct{}),
212		writerDone:        make(chan struct{}),
213		maxStreams:        maxStreams,
214		inTapHandle:       config.InTapHandle,
215		fc:                &trInFlow{limit: uint32(icwz)},
216		state:             reachable,
217		activeStreams:     make(map[uint32]*Stream),
218		stats:             config.StatsHandler,
219		kp:                kp,
220		idle:              time.Now(),
221		kep:               kep,
222		initialWindowSize: iwz,
223	}
224	t.controlBuf = newControlBuffer(t.ctxDone)
225	if dynamicWindow {
226		t.bdpEst = &bdpEstimator{
227			bdp:               initialWindowSize,
228			updateFlowControl: t.updateFlowControl,
229		}
230	}
231	if t.stats != nil {
232		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
233			RemoteAddr: t.remoteAddr,
234			LocalAddr:  t.localAddr,
235		})
236		connBegin := &stats.ConnBegin{}
237		t.stats.HandleConn(t.ctx, connBegin)
238	}
239	if channelz.IsOn() {
240		t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, "")
241	}
242	t.framer.writer.Flush()
243
244	defer func() {
245		if err != nil {
246			t.Close()
247		}
248	}()
249
250	// Check the validity of client preface.
251	preface := make([]byte, len(clientPreface))
252	if _, err := io.ReadFull(t.conn, preface); err != nil {
253		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
254	}
255	if !bytes.Equal(preface, clientPreface) {
256		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
257	}
258
259	frame, err := t.framer.fr.ReadFrame()
260	if err == io.EOF || err == io.ErrUnexpectedEOF {
261		return nil, err
262	}
263	if err != nil {
264		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
265	}
266	atomic.StoreUint32(&t.activity, 1)
267	sf, ok := frame.(*http2.SettingsFrame)
268	if !ok {
269		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
270	}
271	t.handleSettings(sf)
272
273	go func() {
274		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
275		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
276		t.loopy.run()
277		t.conn.Close()
278		close(t.writerDone)
279	}()
280	go t.keepalive()
281	return t, nil
282}
283
284// operateHeader takes action on the decoded headers.
285func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
286	streamID := frame.Header().StreamID
287	var state decodeState
288	for _, hf := range frame.Fields {
289		if err := state.processHeaderField(hf); err != nil {
290			if se, ok := err.(StreamError); ok {
291				t.controlBuf.put(&cleanupStream{
292					streamID: streamID,
293					rst:      true,
294					rstCode:  statusCodeConvTab[se.Code],
295					onWrite:  func() {},
296				})
297			}
298			return
299		}
300	}
301
302	buf := newRecvBuffer()
303	s := &Stream{
304		id:             streamID,
305		st:             t,
306		buf:            buf,
307		fc:             &inFlow{limit: uint32(t.initialWindowSize)},
308		recvCompress:   state.encoding,
309		method:         state.method,
310		contentSubtype: state.contentSubtype,
311	}
312	if frame.StreamEnded() {
313		// s is just created by the caller. No lock needed.
314		s.state = streamReadDone
315	}
316	if state.timeoutSet {
317		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
318	} else {
319		s.ctx, s.cancel = context.WithCancel(t.ctx)
320	}
321	pr := &peer.Peer{
322		Addr: t.remoteAddr,
323	}
324	// Attach Auth info if there is any.
325	if t.authInfo != nil {
326		pr.AuthInfo = t.authInfo
327	}
328	s.ctx = peer.NewContext(s.ctx, pr)
329	// Attach the received metadata to the context.
330	if len(state.mdata) > 0 {
331		s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
332	}
333	if state.statsTags != nil {
334		s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
335	}
336	if state.statsTrace != nil {
337		s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
338	}
339	if t.inTapHandle != nil {
340		var err error
341		info := &tap.Info{
342			FullMethodName: state.method,
343		}
344		s.ctx, err = t.inTapHandle(s.ctx, info)
345		if err != nil {
346			warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
347			t.controlBuf.put(&cleanupStream{
348				streamID: s.id,
349				rst:      true,
350				rstCode:  http2.ErrCodeRefusedStream,
351				onWrite:  func() {},
352			})
353			return
354		}
355	}
356	t.mu.Lock()
357	if t.state != reachable {
358		t.mu.Unlock()
359		return
360	}
361	if uint32(len(t.activeStreams)) >= t.maxStreams {
362		t.mu.Unlock()
363		t.controlBuf.put(&cleanupStream{
364			streamID: streamID,
365			rst:      true,
366			rstCode:  http2.ErrCodeRefusedStream,
367			onWrite:  func() {},
368		})
369		return
370	}
371	if streamID%2 != 1 || streamID <= t.maxStreamID {
372		t.mu.Unlock()
373		// illegal gRPC stream id.
374		errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
375		return true
376	}
377	t.maxStreamID = streamID
378	t.activeStreams[streamID] = s
379	if len(t.activeStreams) == 1 {
380		t.idle = time.Time{}
381	}
382	t.mu.Unlock()
383	if channelz.IsOn() {
384		t.czmu.Lock()
385		t.streamsStarted++
386		t.lastStreamCreated = time.Now()
387		t.czmu.Unlock()
388	}
389	s.requestRead = func(n int) {
390		t.adjustWindow(s, uint32(n))
391	}
392	s.ctx = traceCtx(s.ctx, s.method)
393	if t.stats != nil {
394		s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
395		inHeader := &stats.InHeader{
396			FullMethod:  s.method,
397			RemoteAddr:  t.remoteAddr,
398			LocalAddr:   t.localAddr,
399			Compression: s.recvCompress,
400			WireLength:  int(frame.Header().Length),
401		}
402		t.stats.HandleRPC(s.ctx, inHeader)
403	}
404	s.ctxDone = s.ctx.Done()
405	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
406	s.trReader = &transportReader{
407		reader: &recvBufferReader{
408			ctx:     s.ctx,
409			ctxDone: s.ctxDone,
410			recv:    s.buf,
411		},
412		windowHandler: func(n int) {
413			t.updateWindow(s, uint32(n))
414		},
415	}
416	handle(s)
417	return
418}
419
420// HandleStreams receives incoming streams using the given handler. This is
421// typically run in a separate goroutine.
422// traceCtx attaches trace to ctx and returns the new context.
423func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
424	defer close(t.readerDone)
425	for {
426		frame, err := t.framer.fr.ReadFrame()
427		atomic.StoreUint32(&t.activity, 1)
428		if err != nil {
429			if se, ok := err.(http2.StreamError); ok {
430				warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
431				t.mu.Lock()
432				s := t.activeStreams[se.StreamID]
433				t.mu.Unlock()
434				if s != nil {
435					t.closeStream(s, true, se.Code, nil, false)
436				} else {
437					t.controlBuf.put(&cleanupStream{
438						streamID: se.StreamID,
439						rst:      true,
440						rstCode:  se.Code,
441						onWrite:  func() {},
442					})
443				}
444				continue
445			}
446			if err == io.EOF || err == io.ErrUnexpectedEOF {
447				t.Close()
448				return
449			}
450			warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
451			t.Close()
452			return
453		}
454		switch frame := frame.(type) {
455		case *http2.MetaHeadersFrame:
456			if t.operateHeaders(frame, handle, traceCtx) {
457				t.Close()
458				break
459			}
460		case *http2.DataFrame:
461			t.handleData(frame)
462		case *http2.RSTStreamFrame:
463			t.handleRSTStream(frame)
464		case *http2.SettingsFrame:
465			t.handleSettings(frame)
466		case *http2.PingFrame:
467			t.handlePing(frame)
468		case *http2.WindowUpdateFrame:
469			t.handleWindowUpdate(frame)
470		case *http2.GoAwayFrame:
471			// TODO: Handle GoAway from the client appropriately.
472		default:
473			errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
474		}
475	}
476}
477
478func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
479	t.mu.Lock()
480	defer t.mu.Unlock()
481	if t.activeStreams == nil {
482		// The transport is closing.
483		return nil, false
484	}
485	s, ok := t.activeStreams[f.Header().StreamID]
486	if !ok {
487		// The stream is already done.
488		return nil, false
489	}
490	return s, true
491}
492
493// adjustWindow sends out extra window update over the initial window size
494// of stream if the application is requesting data larger in size than
495// the window.
496func (t *http2Server) adjustWindow(s *Stream, n uint32) {
497	if w := s.fc.maybeAdjust(n); w > 0 {
498		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
499	}
500
501}
502
503// updateWindow adjusts the inbound quota for the stream and the transport.
504// Window updates will deliver to the controller for sending when
505// the cumulative quota exceeds the corresponding threshold.
506func (t *http2Server) updateWindow(s *Stream, n uint32) {
507	if w := s.fc.onRead(n); w > 0 {
508		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
509			increment: w,
510		})
511	}
512}
513
514// updateFlowControl updates the incoming flow control windows
515// for the transport and the stream based on the current bdp
516// estimation.
517func (t *http2Server) updateFlowControl(n uint32) {
518	t.mu.Lock()
519	for _, s := range t.activeStreams {
520		s.fc.newLimit(n)
521	}
522	t.initialWindowSize = int32(n)
523	t.mu.Unlock()
524	t.controlBuf.put(&outgoingWindowUpdate{
525		streamID:  0,
526		increment: t.fc.newLimit(n),
527	})
528	t.controlBuf.put(&outgoingSettings{
529		ss: []http2.Setting{
530			{
531				ID:  http2.SettingInitialWindowSize,
532				Val: n,
533			},
534		},
535	})
536
537}
538
539func (t *http2Server) handleData(f *http2.DataFrame) {
540	size := f.Header().Length
541	var sendBDPPing bool
542	if t.bdpEst != nil {
543		sendBDPPing = t.bdpEst.add(size)
544	}
545	// Decouple connection's flow control from application's read.
546	// An update on connection's flow control should not depend on
547	// whether user application has read the data or not. Such a
548	// restriction is already imposed on the stream's flow control,
549	// and therefore the sender will be blocked anyways.
550	// Decoupling the connection flow control will prevent other
551	// active(fast) streams from starving in presence of slow or
552	// inactive streams.
553	if w := t.fc.onData(size); w > 0 {
554		t.controlBuf.put(&outgoingWindowUpdate{
555			streamID:  0,
556			increment: w,
557		})
558	}
559	if sendBDPPing {
560		// Avoid excessive ping detection (e.g. in an L7 proxy)
561		// by sending a window update prior to the BDP ping.
562		if w := t.fc.reset(); w > 0 {
563			t.controlBuf.put(&outgoingWindowUpdate{
564				streamID:  0,
565				increment: w,
566			})
567		}
568		t.controlBuf.put(bdpPing)
569	}
570	// Select the right stream to dispatch.
571	s, ok := t.getStream(f)
572	if !ok {
573		return
574	}
575	if size > 0 {
576		if err := s.fc.onData(size); err != nil {
577			t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
578			return
579		}
580		if f.Header().Flags.Has(http2.FlagDataPadded) {
581			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
582				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
583			}
584		}
585		// TODO(bradfitz, zhaoq): A copy is required here because there is no
586		// guarantee f.Data() is consumed before the arrival of next frame.
587		// Can this copy be eliminated?
588		if len(f.Data()) > 0 {
589			data := make([]byte, len(f.Data()))
590			copy(data, f.Data())
591			s.write(recvMsg{data: data})
592		}
593	}
594	if f.Header().Flags.Has(http2.FlagDataEndStream) {
595		// Received the end of stream from the client.
596		s.compareAndSwapState(streamActive, streamReadDone)
597		s.write(recvMsg{err: io.EOF})
598	}
599}
600
601func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
602	s, ok := t.getStream(f)
603	if !ok {
604		return
605	}
606	t.closeStream(s, false, 0, nil, false)
607}
608
609func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
610	if f.IsAck() {
611		return
612	}
613	var ss []http2.Setting
614	f.ForeachSetting(func(s http2.Setting) error {
615		ss = append(ss, s)
616		return nil
617	})
618	t.controlBuf.put(&incomingSettings{
619		ss: ss,
620	})
621}
622
623const (
624	maxPingStrikes     = 2
625	defaultPingTimeout = 2 * time.Hour
626)
627
628func (t *http2Server) handlePing(f *http2.PingFrame) {
629	if f.IsAck() {
630		if f.Data == goAwayPing.data && t.drainChan != nil {
631			close(t.drainChan)
632			return
633		}
634		// Maybe it's a BDP ping.
635		if t.bdpEst != nil {
636			t.bdpEst.calculate(f.Data)
637		}
638		return
639	}
640	pingAck := &ping{ack: true}
641	copy(pingAck.data[:], f.Data[:])
642	t.controlBuf.put(pingAck)
643
644	now := time.Now()
645	defer func() {
646		t.lastPingAt = now
647	}()
648	// A reset ping strikes means that we don't need to check for policy
649	// violation for this ping and the pingStrikes counter should be set
650	// to 0.
651	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
652		t.pingStrikes = 0
653		return
654	}
655	t.mu.Lock()
656	ns := len(t.activeStreams)
657	t.mu.Unlock()
658	if ns < 1 && !t.kep.PermitWithoutStream {
659		// Keepalive shouldn't be active thus, this new ping should
660		// have come after at least defaultPingTimeout.
661		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
662			t.pingStrikes++
663		}
664	} else {
665		// Check if keepalive policy is respected.
666		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
667			t.pingStrikes++
668		}
669	}
670
671	if t.pingStrikes > maxPingStrikes {
672		// Send goaway and close the connection.
673		errorf("transport: Got too many pings from the client, closing the connection.")
674		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
675	}
676}
677
678func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
679	t.controlBuf.put(&incomingWindowUpdate{
680		streamID:  f.Header().StreamID,
681		increment: f.Increment,
682	})
683}
684
685// WriteHeader sends the header metedata md back to the client.
686func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
687	if s.headerOk || s.getState() == streamDone {
688		return ErrIllegalHeaderWrite
689	}
690	s.headerOk = true
691	if md.Len() > 0 {
692		if s.header.Len() > 0 {
693			s.header = metadata.Join(s.header, md)
694		} else {
695			s.header = md
696		}
697	}
698	md = s.header
699	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
700	// first and create a slice of that exact size.
701	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
702	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
703	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
704	if s.sendCompress != "" {
705		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
706	}
707	for k, vv := range md {
708		if isReservedHeader(k) {
709			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
710			continue
711		}
712		for _, v := range vv {
713			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
714		}
715	}
716	t.controlBuf.put(&headerFrame{
717		streamID:  s.id,
718		hf:        headerFields,
719		endStream: false,
720		onWrite: func() {
721			atomic.StoreUint32(&t.resetPingStrikes, 1)
722		},
723		wq: s.wq,
724	})
725	if t.stats != nil {
726		// Note: WireLength is not set in outHeader.
727		// TODO(mmukhi): Revisit this later, if needed.
728		outHeader := &stats.OutHeader{}
729		t.stats.HandleRPC(s.Context(), outHeader)
730	}
731	return nil
732}
733
734// WriteStatus sends stream status to the client and terminates the stream.
735// There is no further I/O operations being able to perform on this stream.
736// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
737// OK is adopted.
738func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
739	if !s.headerOk && s.header.Len() > 0 {
740		if err := t.WriteHeader(s, nil); err != nil {
741			return err
742		}
743	} else {
744		if s.getState() == streamDone {
745			return nil
746		}
747	}
748	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
749	// first and create a slice of that exact size.
750	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
751	if !s.headerOk {
752		headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
753		headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
754	}
755	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
756	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
757
758	if p := st.Proto(); p != nil && len(p.Details) > 0 {
759		stBytes, err := proto.Marshal(p)
760		if err != nil {
761			// TODO: return error instead, when callers are able to handle it.
762			panic(err)
763		}
764
765		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
766	}
767
768	// Attach the trailer metadata.
769	for k, vv := range s.trailer {
770		// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
771		if isReservedHeader(k) {
772			continue
773		}
774		for _, v := range vv {
775			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
776		}
777	}
778	trailer := &headerFrame{
779		streamID:  s.id,
780		hf:        headerFields,
781		endStream: true,
782		onWrite: func() {
783			atomic.StoreUint32(&t.resetPingStrikes, 1)
784		},
785	}
786	t.closeStream(s, false, 0, trailer, true)
787	if t.stats != nil {
788		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
789	}
790	return nil
791}
792
793// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
794// is returns if it fails (e.g., framing error, transport error).
795func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
796	if !s.headerOk { // Headers haven't been written yet.
797		if err := t.WriteHeader(s, nil); err != nil {
798			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
799			return streamErrorf(codes.Internal, "transport: %v", err)
800		}
801	} else {
802		// Writing headers checks for this condition.
803		if s.getState() == streamDone {
804			// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
805			s.cancel()
806			select {
807			case <-t.ctx.Done():
808				return ErrConnClosing
809			default:
810			}
811			return ContextErr(s.ctx.Err())
812		}
813	}
814	// Add some data to header frame so that we can equally distribute bytes across frames.
815	emptyLen := http2MaxFrameLen - len(hdr)
816	if emptyLen > len(data) {
817		emptyLen = len(data)
818	}
819	hdr = append(hdr, data[:emptyLen]...)
820	data = data[emptyLen:]
821	df := &dataFrame{
822		streamID: s.id,
823		h:        hdr,
824		d:        data,
825		onEachWrite: func() {
826			atomic.StoreUint32(&t.resetPingStrikes, 1)
827		},
828	}
829	if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
830		select {
831		case <-t.ctx.Done():
832			return ErrConnClosing
833		default:
834		}
835		return ContextErr(s.ctx.Err())
836	}
837	return t.controlBuf.put(df)
838}
839
840// keepalive running in a separate goroutine does the following:
841// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
842// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
843// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
844// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
845// after an additional duration of keepalive.Timeout.
846func (t *http2Server) keepalive() {
847	p := &ping{}
848	var pingSent bool
849	maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
850	maxAge := time.NewTimer(t.kp.MaxConnectionAge)
851	keepalive := time.NewTimer(t.kp.Time)
852	// NOTE: All exit paths of this function should reset their
853	// respective timers. A failure to do so will cause the
854	// following clean-up to deadlock and eventually leak.
855	defer func() {
856		if !maxIdle.Stop() {
857			<-maxIdle.C
858		}
859		if !maxAge.Stop() {
860			<-maxAge.C
861		}
862		if !keepalive.Stop() {
863			<-keepalive.C
864		}
865	}()
866	for {
867		select {
868		case <-maxIdle.C:
869			t.mu.Lock()
870			idle := t.idle
871			if idle.IsZero() { // The connection is non-idle.
872				t.mu.Unlock()
873				maxIdle.Reset(t.kp.MaxConnectionIdle)
874				continue
875			}
876			val := t.kp.MaxConnectionIdle - time.Since(idle)
877			t.mu.Unlock()
878			if val <= 0 {
879				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
880				// Gracefully close the connection.
881				t.drain(http2.ErrCodeNo, []byte{})
882				// Resetting the timer so that the clean-up doesn't deadlock.
883				maxIdle.Reset(infinity)
884				return
885			}
886			maxIdle.Reset(val)
887		case <-maxAge.C:
888			t.drain(http2.ErrCodeNo, []byte{})
889			maxAge.Reset(t.kp.MaxConnectionAgeGrace)
890			select {
891			case <-maxAge.C:
892				// Close the connection after grace period.
893				t.Close()
894				// Resetting the timer so that the clean-up doesn't deadlock.
895				maxAge.Reset(infinity)
896			case <-t.ctx.Done():
897			}
898			return
899		case <-keepalive.C:
900			if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
901				pingSent = false
902				keepalive.Reset(t.kp.Time)
903				continue
904			}
905			if pingSent {
906				t.Close()
907				// Resetting the timer so that the clean-up doesn't deadlock.
908				keepalive.Reset(infinity)
909				return
910			}
911			pingSent = true
912			if channelz.IsOn() {
913				t.czmu.Lock()
914				t.kpCount++
915				t.czmu.Unlock()
916			}
917			t.controlBuf.put(p)
918			keepalive.Reset(t.kp.Timeout)
919		case <-t.ctx.Done():
920			return
921		}
922	}
923}
924
925// Close starts shutting down the http2Server transport.
926// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
927// could cause some resource issue. Revisit this later.
928func (t *http2Server) Close() error {
929	t.mu.Lock()
930	if t.state == closing {
931		t.mu.Unlock()
932		return errors.New("transport: Close() was already called")
933	}
934	t.state = closing
935	streams := t.activeStreams
936	t.activeStreams = nil
937	t.mu.Unlock()
938	t.controlBuf.finish()
939	t.cancel()
940	err := t.conn.Close()
941	if channelz.IsOn() {
942		channelz.RemoveEntry(t.channelzID)
943	}
944	// Cancel all active streams.
945	for _, s := range streams {
946		s.cancel()
947	}
948	if t.stats != nil {
949		connEnd := &stats.ConnEnd{}
950		t.stats.HandleConn(t.ctx, connEnd)
951	}
952	return err
953}
954
955// closeStream clears the footprint of a stream when the stream is not needed
956// any more.
957func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
958	if s.swapState(streamDone) == streamDone {
959		// If the stream was already done, return.
960		return
961	}
962	// In case stream sending and receiving are invoked in separate
963	// goroutines (e.g., bi-directional streaming), cancel needs to be
964	// called to interrupt the potential blocking on other goroutines.
965	s.cancel()
966	cleanup := &cleanupStream{
967		streamID: s.id,
968		rst:      rst,
969		rstCode:  rstCode,
970		onWrite: func() {
971			t.mu.Lock()
972			if t.activeStreams != nil {
973				delete(t.activeStreams, s.id)
974				if len(t.activeStreams) == 0 {
975					t.idle = time.Now()
976				}
977			}
978			t.mu.Unlock()
979			if channelz.IsOn() {
980				t.czmu.Lock()
981				if eosReceived {
982					t.streamsSucceeded++
983				} else {
984					t.streamsFailed++
985				}
986				t.czmu.Unlock()
987			}
988		},
989	}
990	if hdr != nil {
991		hdr.cleanup = cleanup
992		t.controlBuf.put(hdr)
993	} else {
994		t.controlBuf.put(cleanup)
995	}
996}
997
998func (t *http2Server) RemoteAddr() net.Addr {
999	return t.remoteAddr
1000}
1001
1002func (t *http2Server) Drain() {
1003	t.drain(http2.ErrCodeNo, []byte{})
1004}
1005
1006func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1007	t.mu.Lock()
1008	defer t.mu.Unlock()
1009	if t.drainChan != nil {
1010		return
1011	}
1012	t.drainChan = make(chan struct{})
1013	t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1014}
1015
1016var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1017
1018// Handles outgoing GoAway and returns true if loopy needs to put itself
1019// in draining mode.
1020func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1021	t.mu.Lock()
1022	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1023		t.mu.Unlock()
1024		// The transport is closing.
1025		return false, ErrConnClosing
1026	}
1027	sid := t.maxStreamID
1028	if !g.headsUp {
1029		// Stop accepting more streams now.
1030		t.state = draining
1031		if len(t.activeStreams) == 0 {
1032			g.closeConn = true
1033		}
1034		t.mu.Unlock()
1035		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1036			return false, err
1037		}
1038		if g.closeConn {
1039			// Abruptly close the connection following the GoAway (via
1040			// loopywriter).  But flush out what's inside the buffer first.
1041			t.framer.writer.Flush()
1042			return false, fmt.Errorf("transport: Connection closing")
1043		}
1044		return true, nil
1045	}
1046	t.mu.Unlock()
1047	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1048	// Follow that with a ping and wait for the ack to come back or a timer
1049	// to expire. During this time accept new streams since they might have
1050	// originated before the GoAway reaches the client.
1051	// After getting the ack or timer expiration send out another GoAway this
1052	// time with an ID of the max stream server intends to process.
1053	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1054		return false, err
1055	}
1056	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1057		return false, err
1058	}
1059	go func() {
1060		timer := time.NewTimer(time.Minute)
1061		defer timer.Stop()
1062		select {
1063		case <-t.drainChan:
1064		case <-timer.C:
1065		case <-t.ctx.Done():
1066			return
1067		}
1068		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1069	}()
1070	return false, nil
1071}
1072
1073func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1074	t.czmu.RLock()
1075	s := channelz.SocketInternalMetric{
1076		StreamsStarted:                   t.streamsStarted,
1077		StreamsSucceeded:                 t.streamsSucceeded,
1078		StreamsFailed:                    t.streamsFailed,
1079		MessagesSent:                     t.msgSent,
1080		MessagesReceived:                 t.msgRecv,
1081		KeepAlivesSent:                   t.kpCount,
1082		LastRemoteStreamCreatedTimestamp: t.lastStreamCreated,
1083		LastMessageSentTimestamp:         t.lastMsgSent,
1084		LastMessageReceivedTimestamp:     t.lastMsgRecv,
1085		LocalFlowControlWindow:           int64(t.fc.getSize()),
1086		//socket options
1087		LocalAddr:  t.localAddr,
1088		RemoteAddr: t.remoteAddr,
1089		// Security
1090		// RemoteName :
1091	}
1092	t.czmu.RUnlock()
1093	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1094	return &s
1095}
1096
1097func (t *http2Server) IncrMsgSent() {
1098	t.czmu.Lock()
1099	t.msgSent++
1100	t.lastMsgSent = time.Now()
1101	t.czmu.Unlock()
1102}
1103
1104func (t *http2Server) IncrMsgRecv() {
1105	t.czmu.Lock()
1106	t.msgRecv++
1107	t.lastMsgRecv = time.Now()
1108	t.czmu.Unlock()
1109}
1110
1111func (t *http2Server) getOutFlowWindow() int64 {
1112	resp := make(chan uint32)
1113	timer := time.NewTimer(time.Second)
1114	defer timer.Stop()
1115	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1116	select {
1117	case sz := <-resp:
1118		return int64(sz)
1119	case <-t.ctxDone:
1120		return -1
1121	case <-timer.C:
1122		return -2
1123	}
1124}
1125
1126var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
1127
1128func getJitter(v time.Duration) time.Duration {
1129	if v == infinity {
1130		return 0
1131	}
1132	// Generate a jitter between +/- 10% of the value.
1133	r := int64(v / 10)
1134	j := rgen.Int63n(2*r) - r
1135	return time.Duration(j)
1136}
1137