1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"context"
24	"errors"
25	"fmt"
26	"io"
27	"math"
28	"net"
29	"strconv"
30	"sync"
31	"sync/atomic"
32	"time"
33
34	"github.com/golang/protobuf/proto"
35	"golang.org/x/net/http2"
36	"golang.org/x/net/http2/hpack"
37
38	spb "google.golang.org/genproto/googleapis/rpc/status"
39	"google.golang.org/grpc/codes"
40	"google.golang.org/grpc/credentials"
41	"google.golang.org/grpc/grpclog"
42	"google.golang.org/grpc/internal"
43	"google.golang.org/grpc/internal/channelz"
44	"google.golang.org/grpc/internal/grpcrand"
45	"google.golang.org/grpc/keepalive"
46	"google.golang.org/grpc/metadata"
47	"google.golang.org/grpc/peer"
48	"google.golang.org/grpc/stats"
49	"google.golang.org/grpc/status"
50	"google.golang.org/grpc/tap"
51)
52
53var (
54	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
55	// the stream's state.
56	ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
57	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
58	// than the limit set by peer.
59	ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
60	// statusRawProto is a function to get to the raw status proto wrapped in a
61	// status.Status without a proto.Clone().
62	statusRawProto = internal.StatusRawProto.(func(*status.Status) *spb.Status)
63)
64
65// serverConnectionCounter counts the number of connections a server has seen
66// (equal to the number of http2Servers created). Must be accessed atomically.
67var serverConnectionCounter uint64
68
69// http2Server implements the ServerTransport interface with HTTP2.
70type http2Server struct {
71	lastRead    int64 // Keep this field 64-bit aligned. Accessed atomically.
72	ctx         context.Context
73	done        chan struct{}
74	conn        net.Conn
75	loopy       *loopyWriter
76	readerDone  chan struct{} // sync point to enable testing.
77	writerDone  chan struct{} // sync point to enable testing.
78	remoteAddr  net.Addr
79	localAddr   net.Addr
80	maxStreamID uint32               // max stream ID ever seen
81	authInfo    credentials.AuthInfo // auth info about the connection
82	inTapHandle tap.ServerInHandle
83	framer      *framer
84	// The max number of concurrent streams.
85	maxStreams uint32
86	// controlBuf delivers all the control related tasks (e.g., window
87	// updates, reset streams, and various settings) to the controller.
88	controlBuf *controlBuffer
89	fc         *trInFlow
90	stats      stats.Handler
91	// Keepalive and max-age parameters for the server.
92	kp keepalive.ServerParameters
93	// Keepalive enforcement policy.
94	kep keepalive.EnforcementPolicy
95	// The time instance last ping was received.
96	lastPingAt time.Time
97	// Number of times the client has violated keepalive ping policy so far.
98	pingStrikes uint8
99	// Flag to signify that number of ping strikes should be reset to 0.
100	// This is set whenever data or header frames are sent.
101	// 1 means yes.
102	resetPingStrikes      uint32 // Accessed atomically.
103	initialWindowSize     int32
104	bdpEst                *bdpEstimator
105	maxSendHeaderListSize *uint32
106
107	mu sync.Mutex // guard the following
108
109	// drainChan is initialized when drain(...) is called the first time.
110	// After which the server writes out the first GoAway(with ID 2^31-1) frame.
111	// Then an independent goroutine will be launched to later send the second GoAway.
112	// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
113	// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
114	// already underway.
115	drainChan     chan struct{}
116	state         transportState
117	activeStreams map[uint32]*Stream
118	// idle is the time instant when the connection went idle.
119	// This is either the beginning of the connection or when the number of
120	// RPCs go down to 0.
121	// When the connection is busy, this value is set to 0.
122	idle time.Time
123
124	// Fields below are for channelz metric collection.
125	channelzID int64 // channelz unique identification number
126	czData     *channelzData
127	bufferPool *bufferPool
128
129	connectionID uint64
130}
131
132// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
133// returned if something goes wrong.
134func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
135	writeBufSize := config.WriteBufferSize
136	readBufSize := config.ReadBufferSize
137	maxHeaderListSize := defaultServerMaxHeaderListSize
138	if config.MaxHeaderListSize != nil {
139		maxHeaderListSize = *config.MaxHeaderListSize
140	}
141	framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
142	// Send initial settings as connection preface to client.
143	isettings := []http2.Setting{{
144		ID:  http2.SettingMaxFrameSize,
145		Val: http2MaxFrameLen,
146	}}
147	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
148	// permitted in the HTTP2 spec.
149	maxStreams := config.MaxStreams
150	if maxStreams == 0 {
151		maxStreams = math.MaxUint32
152	} else {
153		isettings = append(isettings, http2.Setting{
154			ID:  http2.SettingMaxConcurrentStreams,
155			Val: maxStreams,
156		})
157	}
158	dynamicWindow := true
159	iwz := int32(initialWindowSize)
160	if config.InitialWindowSize >= defaultWindowSize {
161		iwz = config.InitialWindowSize
162		dynamicWindow = false
163	}
164	icwz := int32(initialWindowSize)
165	if config.InitialConnWindowSize >= defaultWindowSize {
166		icwz = config.InitialConnWindowSize
167		dynamicWindow = false
168	}
169	if iwz != defaultWindowSize {
170		isettings = append(isettings, http2.Setting{
171			ID:  http2.SettingInitialWindowSize,
172			Val: uint32(iwz)})
173	}
174	if config.MaxHeaderListSize != nil {
175		isettings = append(isettings, http2.Setting{
176			ID:  http2.SettingMaxHeaderListSize,
177			Val: *config.MaxHeaderListSize,
178		})
179	}
180	if config.HeaderTableSize != nil {
181		isettings = append(isettings, http2.Setting{
182			ID:  http2.SettingHeaderTableSize,
183			Val: *config.HeaderTableSize,
184		})
185	}
186	if err := framer.fr.WriteSettings(isettings...); err != nil {
187		return nil, connectionErrorf(false, err, "transport: %v", err)
188	}
189	// Adjust the connection flow control window if needed.
190	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
191		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
192			return nil, connectionErrorf(false, err, "transport: %v", err)
193		}
194	}
195	kp := config.KeepaliveParams
196	if kp.MaxConnectionIdle == 0 {
197		kp.MaxConnectionIdle = defaultMaxConnectionIdle
198	}
199	if kp.MaxConnectionAge == 0 {
200		kp.MaxConnectionAge = defaultMaxConnectionAge
201	}
202	// Add a jitter to MaxConnectionAge.
203	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
204	if kp.MaxConnectionAgeGrace == 0 {
205		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
206	}
207	if kp.Time == 0 {
208		kp.Time = defaultServerKeepaliveTime
209	}
210	if kp.Timeout == 0 {
211		kp.Timeout = defaultServerKeepaliveTimeout
212	}
213	kep := config.KeepalivePolicy
214	if kep.MinTime == 0 {
215		kep.MinTime = defaultKeepalivePolicyMinTime
216	}
217	done := make(chan struct{})
218	t := &http2Server{
219		ctx:               context.Background(),
220		done:              done,
221		conn:              conn,
222		remoteAddr:        conn.RemoteAddr(),
223		localAddr:         conn.LocalAddr(),
224		authInfo:          config.AuthInfo,
225		framer:            framer,
226		readerDone:        make(chan struct{}),
227		writerDone:        make(chan struct{}),
228		maxStreams:        maxStreams,
229		inTapHandle:       config.InTapHandle,
230		fc:                &trInFlow{limit: uint32(icwz)},
231		state:             reachable,
232		activeStreams:     make(map[uint32]*Stream),
233		stats:             config.StatsHandler,
234		kp:                kp,
235		idle:              time.Now(),
236		kep:               kep,
237		initialWindowSize: iwz,
238		czData:            new(channelzData),
239		bufferPool:        newBufferPool(),
240	}
241	t.controlBuf = newControlBuffer(t.done)
242	if dynamicWindow {
243		t.bdpEst = &bdpEstimator{
244			bdp:               initialWindowSize,
245			updateFlowControl: t.updateFlowControl,
246		}
247	}
248	if t.stats != nil {
249		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
250			RemoteAddr: t.remoteAddr,
251			LocalAddr:  t.localAddr,
252		})
253		connBegin := &stats.ConnBegin{}
254		t.stats.HandleConn(t.ctx, connBegin)
255	}
256	if channelz.IsOn() {
257		t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
258	}
259
260	t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
261
262	t.framer.writer.Flush()
263
264	defer func() {
265		if err != nil {
266			t.Close()
267		}
268	}()
269
270	// Check the validity of client preface.
271	preface := make([]byte, len(clientPreface))
272	if _, err := io.ReadFull(t.conn, preface); err != nil {
273		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
274	}
275	if !bytes.Equal(preface, clientPreface) {
276		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
277	}
278
279	frame, err := t.framer.fr.ReadFrame()
280	if err == io.EOF || err == io.ErrUnexpectedEOF {
281		return nil, err
282	}
283	if err != nil {
284		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
285	}
286	atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
287	sf, ok := frame.(*http2.SettingsFrame)
288	if !ok {
289		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
290	}
291	t.handleSettings(sf)
292
293	go func() {
294		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
295		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
296		if err := t.loopy.run(); err != nil {
297			errorf("transport: loopyWriter.run returning. Err: %v", err)
298		}
299		t.conn.Close()
300		close(t.writerDone)
301	}()
302	go t.keepalive()
303	return t, nil
304}
305
306// operateHeader takes action on the decoded headers.
307func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
308	streamID := frame.Header().StreamID
309	state := &decodeState{
310		serverSide: true,
311	}
312	if err := state.decodeHeader(frame); err != nil {
313		if se, ok := status.FromError(err); ok {
314			t.controlBuf.put(&cleanupStream{
315				streamID: streamID,
316				rst:      true,
317				rstCode:  statusCodeConvTab[se.Code()],
318				onWrite:  func() {},
319			})
320		}
321		return false
322	}
323
324	buf := newRecvBuffer()
325	s := &Stream{
326		id:             streamID,
327		st:             t,
328		buf:            buf,
329		fc:             &inFlow{limit: uint32(t.initialWindowSize)},
330		recvCompress:   state.data.encoding,
331		method:         state.data.method,
332		contentSubtype: state.data.contentSubtype,
333	}
334	if frame.StreamEnded() {
335		// s is just created by the caller. No lock needed.
336		s.state = streamReadDone
337	}
338	if state.data.timeoutSet {
339		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
340	} else {
341		s.ctx, s.cancel = context.WithCancel(t.ctx)
342	}
343	pr := &peer.Peer{
344		Addr: t.remoteAddr,
345	}
346	// Attach Auth info if there is any.
347	if t.authInfo != nil {
348		pr.AuthInfo = t.authInfo
349	}
350	s.ctx = peer.NewContext(s.ctx, pr)
351	// Attach the received metadata to the context.
352	if len(state.data.mdata) > 0 {
353		s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
354	}
355	if state.data.statsTags != nil {
356		s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
357	}
358	if state.data.statsTrace != nil {
359		s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
360	}
361	if t.inTapHandle != nil {
362		var err error
363		info := &tap.Info{
364			FullMethodName: state.data.method,
365		}
366		s.ctx, err = t.inTapHandle(s.ctx, info)
367		if err != nil {
368			warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
369			t.controlBuf.put(&cleanupStream{
370				streamID: s.id,
371				rst:      true,
372				rstCode:  http2.ErrCodeRefusedStream,
373				onWrite:  func() {},
374			})
375			s.cancel()
376			return false
377		}
378	}
379	t.mu.Lock()
380	if t.state != reachable {
381		t.mu.Unlock()
382		s.cancel()
383		return false
384	}
385	if uint32(len(t.activeStreams)) >= t.maxStreams {
386		t.mu.Unlock()
387		t.controlBuf.put(&cleanupStream{
388			streamID: streamID,
389			rst:      true,
390			rstCode:  http2.ErrCodeRefusedStream,
391			onWrite:  func() {},
392		})
393		s.cancel()
394		return false
395	}
396	if streamID%2 != 1 || streamID <= t.maxStreamID {
397		t.mu.Unlock()
398		// illegal gRPC stream id.
399		errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
400		s.cancel()
401		return true
402	}
403	t.maxStreamID = streamID
404	t.activeStreams[streamID] = s
405	if len(t.activeStreams) == 1 {
406		t.idle = time.Time{}
407	}
408	t.mu.Unlock()
409	if channelz.IsOn() {
410		atomic.AddInt64(&t.czData.streamsStarted, 1)
411		atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
412	}
413	s.requestRead = func(n int) {
414		t.adjustWindow(s, uint32(n))
415	}
416	s.ctx = traceCtx(s.ctx, s.method)
417	if t.stats != nil {
418		s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
419		inHeader := &stats.InHeader{
420			FullMethod:  s.method,
421			RemoteAddr:  t.remoteAddr,
422			LocalAddr:   t.localAddr,
423			Compression: s.recvCompress,
424			WireLength:  int(frame.Header().Length),
425			Header:      metadata.MD(state.data.mdata).Copy(),
426		}
427		t.stats.HandleRPC(s.ctx, inHeader)
428	}
429	s.ctxDone = s.ctx.Done()
430	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
431	s.trReader = &transportReader{
432		reader: &recvBufferReader{
433			ctx:        s.ctx,
434			ctxDone:    s.ctxDone,
435			recv:       s.buf,
436			freeBuffer: t.bufferPool.put,
437		},
438		windowHandler: func(n int) {
439			t.updateWindow(s, uint32(n))
440		},
441	}
442	// Register the stream with loopy.
443	t.controlBuf.put(&registerStream{
444		streamID: s.id,
445		wq:       s.wq,
446	})
447	handle(s)
448	return false
449}
450
451// HandleStreams receives incoming streams using the given handler. This is
452// typically run in a separate goroutine.
453// traceCtx attaches trace to ctx and returns the new context.
454func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
455	defer close(t.readerDone)
456	for {
457		t.controlBuf.throttle()
458		frame, err := t.framer.fr.ReadFrame()
459		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
460		if err != nil {
461			if se, ok := err.(http2.StreamError); ok {
462				warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
463				t.mu.Lock()
464				s := t.activeStreams[se.StreamID]
465				t.mu.Unlock()
466				if s != nil {
467					t.closeStream(s, true, se.Code, false)
468				} else {
469					t.controlBuf.put(&cleanupStream{
470						streamID: se.StreamID,
471						rst:      true,
472						rstCode:  se.Code,
473						onWrite:  func() {},
474					})
475				}
476				continue
477			}
478			if err == io.EOF || err == io.ErrUnexpectedEOF {
479				t.Close()
480				return
481			}
482			warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
483			t.Close()
484			return
485		}
486		switch frame := frame.(type) {
487		case *http2.MetaHeadersFrame:
488			if t.operateHeaders(frame, handle, traceCtx) {
489				t.Close()
490				break
491			}
492		case *http2.DataFrame:
493			t.handleData(frame)
494		case *http2.RSTStreamFrame:
495			t.handleRSTStream(frame)
496		case *http2.SettingsFrame:
497			t.handleSettings(frame)
498		case *http2.PingFrame:
499			t.handlePing(frame)
500		case *http2.WindowUpdateFrame:
501			t.handleWindowUpdate(frame)
502		case *http2.GoAwayFrame:
503			// TODO: Handle GoAway from the client appropriately.
504		default:
505			errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
506		}
507	}
508}
509
510func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
511	t.mu.Lock()
512	defer t.mu.Unlock()
513	if t.activeStreams == nil {
514		// The transport is closing.
515		return nil, false
516	}
517	s, ok := t.activeStreams[f.Header().StreamID]
518	if !ok {
519		// The stream is already done.
520		return nil, false
521	}
522	return s, true
523}
524
525// adjustWindow sends out extra window update over the initial window size
526// of stream if the application is requesting data larger in size than
527// the window.
528func (t *http2Server) adjustWindow(s *Stream, n uint32) {
529	if w := s.fc.maybeAdjust(n); w > 0 {
530		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
531	}
532
533}
534
535// updateWindow adjusts the inbound quota for the stream and the transport.
536// Window updates will deliver to the controller for sending when
537// the cumulative quota exceeds the corresponding threshold.
538func (t *http2Server) updateWindow(s *Stream, n uint32) {
539	if w := s.fc.onRead(n); w > 0 {
540		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
541			increment: w,
542		})
543	}
544}
545
546// updateFlowControl updates the incoming flow control windows
547// for the transport and the stream based on the current bdp
548// estimation.
549func (t *http2Server) updateFlowControl(n uint32) {
550	t.mu.Lock()
551	for _, s := range t.activeStreams {
552		s.fc.newLimit(n)
553	}
554	t.initialWindowSize = int32(n)
555	t.mu.Unlock()
556	t.controlBuf.put(&outgoingWindowUpdate{
557		streamID:  0,
558		increment: t.fc.newLimit(n),
559	})
560	t.controlBuf.put(&outgoingSettings{
561		ss: []http2.Setting{
562			{
563				ID:  http2.SettingInitialWindowSize,
564				Val: n,
565			},
566		},
567	})
568
569}
570
571func (t *http2Server) handleData(f *http2.DataFrame) {
572	size := f.Header().Length
573	var sendBDPPing bool
574	if t.bdpEst != nil {
575		sendBDPPing = t.bdpEst.add(size)
576	}
577	// Decouple connection's flow control from application's read.
578	// An update on connection's flow control should not depend on
579	// whether user application has read the data or not. Such a
580	// restriction is already imposed on the stream's flow control,
581	// and therefore the sender will be blocked anyways.
582	// Decoupling the connection flow control will prevent other
583	// active(fast) streams from starving in presence of slow or
584	// inactive streams.
585	if w := t.fc.onData(size); w > 0 {
586		t.controlBuf.put(&outgoingWindowUpdate{
587			streamID:  0,
588			increment: w,
589		})
590	}
591	if sendBDPPing {
592		// Avoid excessive ping detection (e.g. in an L7 proxy)
593		// by sending a window update prior to the BDP ping.
594		if w := t.fc.reset(); w > 0 {
595			t.controlBuf.put(&outgoingWindowUpdate{
596				streamID:  0,
597				increment: w,
598			})
599		}
600		t.controlBuf.put(bdpPing)
601	}
602	// Select the right stream to dispatch.
603	s, ok := t.getStream(f)
604	if !ok {
605		return
606	}
607	if size > 0 {
608		if err := s.fc.onData(size); err != nil {
609			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
610			return
611		}
612		if f.Header().Flags.Has(http2.FlagDataPadded) {
613			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
614				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
615			}
616		}
617		// TODO(bradfitz, zhaoq): A copy is required here because there is no
618		// guarantee f.Data() is consumed before the arrival of next frame.
619		// Can this copy be eliminated?
620		if len(f.Data()) > 0 {
621			buffer := t.bufferPool.get()
622			buffer.Reset()
623			buffer.Write(f.Data())
624			s.write(recvMsg{buffer: buffer})
625		}
626	}
627	if f.Header().Flags.Has(http2.FlagDataEndStream) {
628		// Received the end of stream from the client.
629		s.compareAndSwapState(streamActive, streamReadDone)
630		s.write(recvMsg{err: io.EOF})
631	}
632}
633
634func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
635	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
636	if s, ok := t.getStream(f); ok {
637		t.closeStream(s, false, 0, false)
638		return
639	}
640	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
641	t.controlBuf.put(&cleanupStream{
642		streamID: f.Header().StreamID,
643		rst:      false,
644		rstCode:  0,
645		onWrite:  func() {},
646	})
647}
648
649func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
650	if f.IsAck() {
651		return
652	}
653	var ss []http2.Setting
654	var updateFuncs []func()
655	f.ForeachSetting(func(s http2.Setting) error {
656		switch s.ID {
657		case http2.SettingMaxHeaderListSize:
658			updateFuncs = append(updateFuncs, func() {
659				t.maxSendHeaderListSize = new(uint32)
660				*t.maxSendHeaderListSize = s.Val
661			})
662		default:
663			ss = append(ss, s)
664		}
665		return nil
666	})
667	t.controlBuf.executeAndPut(func(interface{}) bool {
668		for _, f := range updateFuncs {
669			f()
670		}
671		return true
672	}, &incomingSettings{
673		ss: ss,
674	})
675}
676
677const (
678	maxPingStrikes     = 2
679	defaultPingTimeout = 2 * time.Hour
680)
681
682func (t *http2Server) handlePing(f *http2.PingFrame) {
683	if f.IsAck() {
684		if f.Data == goAwayPing.data && t.drainChan != nil {
685			close(t.drainChan)
686			return
687		}
688		// Maybe it's a BDP ping.
689		if t.bdpEst != nil {
690			t.bdpEst.calculate(f.Data)
691		}
692		return
693	}
694	pingAck := &ping{ack: true}
695	copy(pingAck.data[:], f.Data[:])
696	t.controlBuf.put(pingAck)
697
698	now := time.Now()
699	defer func() {
700		t.lastPingAt = now
701	}()
702	// A reset ping strikes means that we don't need to check for policy
703	// violation for this ping and the pingStrikes counter should be set
704	// to 0.
705	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
706		t.pingStrikes = 0
707		return
708	}
709	t.mu.Lock()
710	ns := len(t.activeStreams)
711	t.mu.Unlock()
712	if ns < 1 && !t.kep.PermitWithoutStream {
713		// Keepalive shouldn't be active thus, this new ping should
714		// have come after at least defaultPingTimeout.
715		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
716			t.pingStrikes++
717		}
718	} else {
719		// Check if keepalive policy is respected.
720		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
721			t.pingStrikes++
722		}
723	}
724
725	if t.pingStrikes > maxPingStrikes {
726		// Send goaway and close the connection.
727		errorf("transport: Got too many pings from the client, closing the connection.")
728		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
729	}
730}
731
732func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
733	t.controlBuf.put(&incomingWindowUpdate{
734		streamID:  f.Header().StreamID,
735		increment: f.Increment,
736	})
737}
738
739func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
740	for k, vv := range md {
741		if isReservedHeader(k) {
742			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
743			continue
744		}
745		for _, v := range vv {
746			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
747		}
748	}
749	return headerFields
750}
751
752func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
753	if t.maxSendHeaderListSize == nil {
754		return true
755	}
756	hdrFrame := it.(*headerFrame)
757	var sz int64
758	for _, f := range hdrFrame.hf {
759		if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
760			errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
761			return false
762		}
763	}
764	return true
765}
766
767// WriteHeader sends the header metadata md back to the client.
768func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
769	if s.updateHeaderSent() || s.getState() == streamDone {
770		return ErrIllegalHeaderWrite
771	}
772	s.hdrMu.Lock()
773	if md.Len() > 0 {
774		if s.header.Len() > 0 {
775			s.header = metadata.Join(s.header, md)
776		} else {
777			s.header = md
778		}
779	}
780	if err := t.writeHeaderLocked(s); err != nil {
781		s.hdrMu.Unlock()
782		return err
783	}
784	s.hdrMu.Unlock()
785	return nil
786}
787
788func (t *http2Server) setResetPingStrikes() {
789	atomic.StoreUint32(&t.resetPingStrikes, 1)
790}
791
792func (t *http2Server) writeHeaderLocked(s *Stream) error {
793	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
794	// first and create a slice of that exact size.
795	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
796	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
797	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
798	if s.sendCompress != "" {
799		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
800	}
801	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
802	success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
803		streamID:  s.id,
804		hf:        headerFields,
805		endStream: false,
806		onWrite:   t.setResetPingStrikes,
807	})
808	if !success {
809		if err != nil {
810			return err
811		}
812		t.closeStream(s, true, http2.ErrCodeInternal, false)
813		return ErrHeaderListSizeLimitViolation
814	}
815	if t.stats != nil {
816		// Note: WireLength is not set in outHeader.
817		// TODO(mmukhi): Revisit this later, if needed.
818		outHeader := &stats.OutHeader{
819			Header: s.header.Copy(),
820		}
821		t.stats.HandleRPC(s.Context(), outHeader)
822	}
823	return nil
824}
825
826// WriteStatus sends stream status to the client and terminates the stream.
827// There is no further I/O operations being able to perform on this stream.
828// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
829// OK is adopted.
830func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
831	if s.getState() == streamDone {
832		return nil
833	}
834	s.hdrMu.Lock()
835	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
836	// first and create a slice of that exact size.
837	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
838	if !s.updateHeaderSent() {                      // No headers have been sent.
839		if len(s.header) > 0 { // Send a separate header frame.
840			if err := t.writeHeaderLocked(s); err != nil {
841				s.hdrMu.Unlock()
842				return err
843			}
844		} else { // Send a trailer only response.
845			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
846			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
847		}
848	}
849	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
850	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
851
852	if p := statusRawProto(st); p != nil && len(p.Details) > 0 {
853		stBytes, err := proto.Marshal(p)
854		if err != nil {
855			// TODO: return error instead, when callers are able to handle it.
856			grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
857		} else {
858			headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
859		}
860	}
861
862	// Attach the trailer metadata.
863	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
864	trailingHeader := &headerFrame{
865		streamID:  s.id,
866		hf:        headerFields,
867		endStream: true,
868		onWrite:   t.setResetPingStrikes,
869	}
870	s.hdrMu.Unlock()
871	success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
872	if !success {
873		if err != nil {
874			return err
875		}
876		t.closeStream(s, true, http2.ErrCodeInternal, false)
877		return ErrHeaderListSizeLimitViolation
878	}
879	// Send a RST_STREAM after the trailers if the client has not already half-closed.
880	rst := s.getState() == streamActive
881	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
882	if t.stats != nil {
883		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
884			Trailer: s.trailer.Copy(),
885		})
886	}
887	return nil
888}
889
890// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
891// is returns if it fails (e.g., framing error, transport error).
892func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
893	if !s.isHeaderSent() { // Headers haven't been written yet.
894		if err := t.WriteHeader(s, nil); err != nil {
895			if _, ok := err.(ConnectionError); ok {
896				return err
897			}
898			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
899			return status.Errorf(codes.Internal, "transport: %v", err)
900		}
901	} else {
902		// Writing headers checks for this condition.
903		if s.getState() == streamDone {
904			// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
905			s.cancel()
906			select {
907			case <-t.done:
908				return ErrConnClosing
909			default:
910			}
911			return ContextErr(s.ctx.Err())
912		}
913	}
914	// Add some data to header frame so that we can equally distribute bytes across frames.
915	emptyLen := http2MaxFrameLen - len(hdr)
916	if emptyLen > len(data) {
917		emptyLen = len(data)
918	}
919	hdr = append(hdr, data[:emptyLen]...)
920	data = data[emptyLen:]
921	df := &dataFrame{
922		streamID:    s.id,
923		h:           hdr,
924		d:           data,
925		onEachWrite: t.setResetPingStrikes,
926	}
927	if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
928		select {
929		case <-t.done:
930			return ErrConnClosing
931		default:
932		}
933		return ContextErr(s.ctx.Err())
934	}
935	return t.controlBuf.put(df)
936}
937
938// keepalive running in a separate goroutine does the following:
939// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
940// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
941// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
942// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
943// after an additional duration of keepalive.Timeout.
944func (t *http2Server) keepalive() {
945	p := &ping{}
946	// True iff a ping has been sent, and no data has been received since then.
947	outstandingPing := false
948	// Amount of time remaining before which we should receive an ACK for the
949	// last sent ping.
950	kpTimeoutLeft := time.Duration(0)
951	// Records the last value of t.lastRead before we go block on the timer.
952	// This is required to check for read activity since then.
953	prevNano := time.Now().UnixNano()
954	// Initialize the different timers to their default values.
955	idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
956	ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
957	kpTimer := time.NewTimer(t.kp.Time)
958	defer func() {
959		// We need to drain the underlying channel in these timers after a call
960		// to Stop(), only if we are interested in resetting them. Clearly we
961		// are not interested in resetting them here.
962		idleTimer.Stop()
963		ageTimer.Stop()
964		kpTimer.Stop()
965	}()
966
967	for {
968		select {
969		case <-idleTimer.C:
970			t.mu.Lock()
971			idle := t.idle
972			if idle.IsZero() { // The connection is non-idle.
973				t.mu.Unlock()
974				idleTimer.Reset(t.kp.MaxConnectionIdle)
975				continue
976			}
977			val := t.kp.MaxConnectionIdle - time.Since(idle)
978			t.mu.Unlock()
979			if val <= 0 {
980				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
981				// Gracefully close the connection.
982				t.drain(http2.ErrCodeNo, []byte{})
983				return
984			}
985			idleTimer.Reset(val)
986		case <-ageTimer.C:
987			t.drain(http2.ErrCodeNo, []byte{})
988			ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
989			select {
990			case <-ageTimer.C:
991				// Close the connection after grace period.
992				infof("transport: closing server transport due to maximum connection age.")
993				t.Close()
994			case <-t.done:
995			}
996			return
997		case <-kpTimer.C:
998			lastRead := atomic.LoadInt64(&t.lastRead)
999			if lastRead > prevNano {
1000				// There has been read activity since the last time we were
1001				// here. Setup the timer to fire at kp.Time seconds from
1002				// lastRead time and continue.
1003				outstandingPing = false
1004				kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1005				prevNano = lastRead
1006				continue
1007			}
1008			if outstandingPing && kpTimeoutLeft <= 0 {
1009				infof("transport: closing server transport due to idleness.")
1010				t.Close()
1011				return
1012			}
1013			if !outstandingPing {
1014				if channelz.IsOn() {
1015					atomic.AddInt64(&t.czData.kpCount, 1)
1016				}
1017				t.controlBuf.put(p)
1018				kpTimeoutLeft = t.kp.Timeout
1019				outstandingPing = true
1020			}
1021			// The amount of time to sleep here is the minimum of kp.Time and
1022			// timeoutLeft. This will ensure that we wait only for kp.Time
1023			// before sending out the next ping (for cases where the ping is
1024			// acked).
1025			sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
1026			kpTimeoutLeft -= sleepDuration
1027			kpTimer.Reset(sleepDuration)
1028		case <-t.done:
1029			return
1030		}
1031	}
1032}
1033
1034// Close starts shutting down the http2Server transport.
1035// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1036// could cause some resource issue. Revisit this later.
1037func (t *http2Server) Close() error {
1038	t.mu.Lock()
1039	if t.state == closing {
1040		t.mu.Unlock()
1041		return errors.New("transport: Close() was already called")
1042	}
1043	t.state = closing
1044	streams := t.activeStreams
1045	t.activeStreams = nil
1046	t.mu.Unlock()
1047	t.controlBuf.finish()
1048	close(t.done)
1049	err := t.conn.Close()
1050	if channelz.IsOn() {
1051		channelz.RemoveEntry(t.channelzID)
1052	}
1053	// Cancel all active streams.
1054	for _, s := range streams {
1055		s.cancel()
1056	}
1057	if t.stats != nil {
1058		connEnd := &stats.ConnEnd{}
1059		t.stats.HandleConn(t.ctx, connEnd)
1060	}
1061	return err
1062}
1063
1064// deleteStream deletes the stream s from transport's active streams.
1065func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1066	// In case stream sending and receiving are invoked in separate
1067	// goroutines (e.g., bi-directional streaming), cancel needs to be
1068	// called to interrupt the potential blocking on other goroutines.
1069	s.cancel()
1070
1071	t.mu.Lock()
1072	if _, ok := t.activeStreams[s.id]; ok {
1073		delete(t.activeStreams, s.id)
1074		if len(t.activeStreams) == 0 {
1075			t.idle = time.Now()
1076		}
1077	}
1078	t.mu.Unlock()
1079
1080	if channelz.IsOn() {
1081		if eosReceived {
1082			atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1083		} else {
1084			atomic.AddInt64(&t.czData.streamsFailed, 1)
1085		}
1086	}
1087}
1088
1089// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1090func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1091	oldState := s.swapState(streamDone)
1092	if oldState == streamDone {
1093		// If the stream was already done, return.
1094		return
1095	}
1096
1097	hdr.cleanup = &cleanupStream{
1098		streamID: s.id,
1099		rst:      rst,
1100		rstCode:  rstCode,
1101		onWrite: func() {
1102			t.deleteStream(s, eosReceived)
1103		},
1104	}
1105	t.controlBuf.put(hdr)
1106}
1107
1108// closeStream clears the footprint of a stream when the stream is not needed any more.
1109func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1110	s.swapState(streamDone)
1111	t.deleteStream(s, eosReceived)
1112
1113	t.controlBuf.put(&cleanupStream{
1114		streamID: s.id,
1115		rst:      rst,
1116		rstCode:  rstCode,
1117		onWrite:  func() {},
1118	})
1119}
1120
1121func (t *http2Server) RemoteAddr() net.Addr {
1122	return t.remoteAddr
1123}
1124
1125func (t *http2Server) Drain() {
1126	t.drain(http2.ErrCodeNo, []byte{})
1127}
1128
1129func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1130	t.mu.Lock()
1131	defer t.mu.Unlock()
1132	if t.drainChan != nil {
1133		return
1134	}
1135	t.drainChan = make(chan struct{})
1136	t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1137}
1138
1139var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1140
1141// Handles outgoing GoAway and returns true if loopy needs to put itself
1142// in draining mode.
1143func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1144	t.mu.Lock()
1145	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1146		t.mu.Unlock()
1147		// The transport is closing.
1148		return false, ErrConnClosing
1149	}
1150	sid := t.maxStreamID
1151	if !g.headsUp {
1152		// Stop accepting more streams now.
1153		t.state = draining
1154		if len(t.activeStreams) == 0 {
1155			g.closeConn = true
1156		}
1157		t.mu.Unlock()
1158		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1159			return false, err
1160		}
1161		if g.closeConn {
1162			// Abruptly close the connection following the GoAway (via
1163			// loopywriter).  But flush out what's inside the buffer first.
1164			t.framer.writer.Flush()
1165			return false, fmt.Errorf("transport: Connection closing")
1166		}
1167		return true, nil
1168	}
1169	t.mu.Unlock()
1170	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1171	// Follow that with a ping and wait for the ack to come back or a timer
1172	// to expire. During this time accept new streams since they might have
1173	// originated before the GoAway reaches the client.
1174	// After getting the ack or timer expiration send out another GoAway this
1175	// time with an ID of the max stream server intends to process.
1176	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1177		return false, err
1178	}
1179	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1180		return false, err
1181	}
1182	go func() {
1183		timer := time.NewTimer(time.Minute)
1184		defer timer.Stop()
1185		select {
1186		case <-t.drainChan:
1187		case <-timer.C:
1188		case <-t.done:
1189			return
1190		}
1191		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1192	}()
1193	return false, nil
1194}
1195
1196func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1197	s := channelz.SocketInternalMetric{
1198		StreamsStarted:                   atomic.LoadInt64(&t.czData.streamsStarted),
1199		StreamsSucceeded:                 atomic.LoadInt64(&t.czData.streamsSucceeded),
1200		StreamsFailed:                    atomic.LoadInt64(&t.czData.streamsFailed),
1201		MessagesSent:                     atomic.LoadInt64(&t.czData.msgSent),
1202		MessagesReceived:                 atomic.LoadInt64(&t.czData.msgRecv),
1203		KeepAlivesSent:                   atomic.LoadInt64(&t.czData.kpCount),
1204		LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1205		LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1206		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1207		LocalFlowControlWindow:           int64(t.fc.getSize()),
1208		SocketOptions:                    channelz.GetSocketOption(t.conn),
1209		LocalAddr:                        t.localAddr,
1210		RemoteAddr:                       t.remoteAddr,
1211		// RemoteName :
1212	}
1213	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1214		s.Security = au.GetSecurityValue()
1215	}
1216	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1217	return &s
1218}
1219
1220func (t *http2Server) IncrMsgSent() {
1221	atomic.AddInt64(&t.czData.msgSent, 1)
1222	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1223}
1224
1225func (t *http2Server) IncrMsgRecv() {
1226	atomic.AddInt64(&t.czData.msgRecv, 1)
1227	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1228}
1229
1230func (t *http2Server) getOutFlowWindow() int64 {
1231	resp := make(chan uint32, 1)
1232	timer := time.NewTimer(time.Second)
1233	defer timer.Stop()
1234	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1235	select {
1236	case sz := <-resp:
1237		return int64(sz)
1238	case <-t.done:
1239		return -1
1240	case <-timer.C:
1241		return -2
1242	}
1243}
1244
1245func getJitter(v time.Duration) time.Duration {
1246	if v == infinity {
1247		return 0
1248	}
1249	// Generate a jitter between +/- 10% of the value.
1250	r := int64(v / 10)
1251	j := grpcrand.Int63n(2*r) - r
1252	return time.Duration(j)
1253}
1254