1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"context"
24	"errors"
25	"fmt"
26	"io"
27	"math"
28	"net"
29	"net/http"
30	"strconv"
31	"sync"
32	"sync/atomic"
33	"time"
34
35	"github.com/golang/protobuf/proto"
36	"golang.org/x/net/http2"
37	"golang.org/x/net/http2/hpack"
38	"google.golang.org/grpc/internal/grpcutil"
39
40	"google.golang.org/grpc/codes"
41	"google.golang.org/grpc/credentials"
42	"google.golang.org/grpc/internal/channelz"
43	"google.golang.org/grpc/internal/grpcrand"
44	"google.golang.org/grpc/keepalive"
45	"google.golang.org/grpc/metadata"
46	"google.golang.org/grpc/peer"
47	"google.golang.org/grpc/stats"
48	"google.golang.org/grpc/status"
49	"google.golang.org/grpc/tap"
50)
51
52var (
53	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
54	// the stream's state.
55	ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
56	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
57	// than the limit set by peer.
58	ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
59)
60
61// serverConnectionCounter counts the number of connections a server has seen
62// (equal to the number of http2Servers created). Must be accessed atomically.
63var serverConnectionCounter uint64
64
65// http2Server implements the ServerTransport interface with HTTP2.
66type http2Server struct {
67	lastRead    int64 // Keep this field 64-bit aligned. Accessed atomically.
68	ctx         context.Context
69	done        chan struct{}
70	conn        net.Conn
71	loopy       *loopyWriter
72	readerDone  chan struct{} // sync point to enable testing.
73	writerDone  chan struct{} // sync point to enable testing.
74	remoteAddr  net.Addr
75	localAddr   net.Addr
76	maxStreamID uint32               // max stream ID ever seen
77	authInfo    credentials.AuthInfo // auth info about the connection
78	inTapHandle tap.ServerInHandle
79	framer      *framer
80	// The max number of concurrent streams.
81	maxStreams uint32
82	// controlBuf delivers all the control related tasks (e.g., window
83	// updates, reset streams, and various settings) to the controller.
84	controlBuf *controlBuffer
85	fc         *trInFlow
86	stats      stats.Handler
87	// Keepalive and max-age parameters for the server.
88	kp keepalive.ServerParameters
89	// Keepalive enforcement policy.
90	kep keepalive.EnforcementPolicy
91	// The time instance last ping was received.
92	lastPingAt time.Time
93	// Number of times the client has violated keepalive ping policy so far.
94	pingStrikes uint8
95	// Flag to signify that number of ping strikes should be reset to 0.
96	// This is set whenever data or header frames are sent.
97	// 1 means yes.
98	resetPingStrikes      uint32 // Accessed atomically.
99	initialWindowSize     int32
100	bdpEst                *bdpEstimator
101	maxSendHeaderListSize *uint32
102
103	mu sync.Mutex // guard the following
104
105	// drainChan is initialized when drain(...) is called the first time.
106	// After which the server writes out the first GoAway(with ID 2^31-1) frame.
107	// Then an independent goroutine will be launched to later send the second GoAway.
108	// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
109	// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
110	// already underway.
111	drainChan     chan struct{}
112	state         transportState
113	activeStreams map[uint32]*Stream
114	// idle is the time instant when the connection went idle.
115	// This is either the beginning of the connection or when the number of
116	// RPCs go down to 0.
117	// When the connection is busy, this value is set to 0.
118	idle time.Time
119
120	// Fields below are for channelz metric collection.
121	channelzID int64 // channelz unique identification number
122	czData     *channelzData
123	bufferPool *bufferPool
124
125	connectionID uint64
126}
127
128// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
129// returned if something goes wrong.
130func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
131	writeBufSize := config.WriteBufferSize
132	readBufSize := config.ReadBufferSize
133	maxHeaderListSize := defaultServerMaxHeaderListSize
134	if config.MaxHeaderListSize != nil {
135		maxHeaderListSize = *config.MaxHeaderListSize
136	}
137	framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
138	// Send initial settings as connection preface to client.
139	isettings := []http2.Setting{{
140		ID:  http2.SettingMaxFrameSize,
141		Val: http2MaxFrameLen,
142	}}
143	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
144	// permitted in the HTTP2 spec.
145	maxStreams := config.MaxStreams
146	if maxStreams == 0 {
147		maxStreams = math.MaxUint32
148	} else {
149		isettings = append(isettings, http2.Setting{
150			ID:  http2.SettingMaxConcurrentStreams,
151			Val: maxStreams,
152		})
153	}
154	dynamicWindow := true
155	iwz := int32(initialWindowSize)
156	if config.InitialWindowSize >= defaultWindowSize {
157		iwz = config.InitialWindowSize
158		dynamicWindow = false
159	}
160	icwz := int32(initialWindowSize)
161	if config.InitialConnWindowSize >= defaultWindowSize {
162		icwz = config.InitialConnWindowSize
163		dynamicWindow = false
164	}
165	if iwz != defaultWindowSize {
166		isettings = append(isettings, http2.Setting{
167			ID:  http2.SettingInitialWindowSize,
168			Val: uint32(iwz)})
169	}
170	if config.MaxHeaderListSize != nil {
171		isettings = append(isettings, http2.Setting{
172			ID:  http2.SettingMaxHeaderListSize,
173			Val: *config.MaxHeaderListSize,
174		})
175	}
176	if config.HeaderTableSize != nil {
177		isettings = append(isettings, http2.Setting{
178			ID:  http2.SettingHeaderTableSize,
179			Val: *config.HeaderTableSize,
180		})
181	}
182	if err := framer.fr.WriteSettings(isettings...); err != nil {
183		return nil, connectionErrorf(false, err, "transport: %v", err)
184	}
185	// Adjust the connection flow control window if needed.
186	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
187		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
188			return nil, connectionErrorf(false, err, "transport: %v", err)
189		}
190	}
191	kp := config.KeepaliveParams
192	if kp.MaxConnectionIdle == 0 {
193		kp.MaxConnectionIdle = defaultMaxConnectionIdle
194	}
195	if kp.MaxConnectionAge == 0 {
196		kp.MaxConnectionAge = defaultMaxConnectionAge
197	}
198	// Add a jitter to MaxConnectionAge.
199	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
200	if kp.MaxConnectionAgeGrace == 0 {
201		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
202	}
203	if kp.Time == 0 {
204		kp.Time = defaultServerKeepaliveTime
205	}
206	if kp.Timeout == 0 {
207		kp.Timeout = defaultServerKeepaliveTimeout
208	}
209	kep := config.KeepalivePolicy
210	if kep.MinTime == 0 {
211		kep.MinTime = defaultKeepalivePolicyMinTime
212	}
213	done := make(chan struct{})
214	t := &http2Server{
215		ctx:               context.Background(),
216		done:              done,
217		conn:              conn,
218		remoteAddr:        conn.RemoteAddr(),
219		localAddr:         conn.LocalAddr(),
220		authInfo:          config.AuthInfo,
221		framer:            framer,
222		readerDone:        make(chan struct{}),
223		writerDone:        make(chan struct{}),
224		maxStreams:        maxStreams,
225		inTapHandle:       config.InTapHandle,
226		fc:                &trInFlow{limit: uint32(icwz)},
227		state:             reachable,
228		activeStreams:     make(map[uint32]*Stream),
229		stats:             config.StatsHandler,
230		kp:                kp,
231		idle:              time.Now(),
232		kep:               kep,
233		initialWindowSize: iwz,
234		czData:            new(channelzData),
235		bufferPool:        newBufferPool(),
236	}
237	t.controlBuf = newControlBuffer(t.done)
238	if dynamicWindow {
239		t.bdpEst = &bdpEstimator{
240			bdp:               initialWindowSize,
241			updateFlowControl: t.updateFlowControl,
242		}
243	}
244	if t.stats != nil {
245		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
246			RemoteAddr: t.remoteAddr,
247			LocalAddr:  t.localAddr,
248		})
249		connBegin := &stats.ConnBegin{}
250		t.stats.HandleConn(t.ctx, connBegin)
251	}
252	if channelz.IsOn() {
253		t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
254	}
255
256	t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
257
258	t.framer.writer.Flush()
259
260	defer func() {
261		if err != nil {
262			t.Close()
263		}
264	}()
265
266	// Check the validity of client preface.
267	preface := make([]byte, len(clientPreface))
268	if _, err := io.ReadFull(t.conn, preface); err != nil {
269		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
270	}
271	if !bytes.Equal(preface, clientPreface) {
272		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
273	}
274
275	frame, err := t.framer.fr.ReadFrame()
276	if err == io.EOF || err == io.ErrUnexpectedEOF {
277		return nil, err
278	}
279	if err != nil {
280		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
281	}
282	atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
283	sf, ok := frame.(*http2.SettingsFrame)
284	if !ok {
285		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
286	}
287	t.handleSettings(sf)
288
289	go func() {
290		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
291		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
292		if err := t.loopy.run(); err != nil {
293			if logger.V(logLevel) {
294				logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
295			}
296		}
297		t.conn.Close()
298		close(t.writerDone)
299	}()
300	go t.keepalive()
301	return t, nil
302}
303
304// operateHeader takes action on the decoded headers.
305func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
306	streamID := frame.Header().StreamID
307	state := &decodeState{
308		serverSide: true,
309	}
310	if h2code, err := state.decodeHeader(frame); err != nil {
311		if _, ok := status.FromError(err); ok {
312			t.controlBuf.put(&cleanupStream{
313				streamID: streamID,
314				rst:      true,
315				rstCode:  h2code,
316				onWrite:  func() {},
317			})
318		}
319		return false
320	}
321
322	buf := newRecvBuffer()
323	s := &Stream{
324		id:             streamID,
325		st:             t,
326		buf:            buf,
327		fc:             &inFlow{limit: uint32(t.initialWindowSize)},
328		recvCompress:   state.data.encoding,
329		method:         state.data.method,
330		contentSubtype: state.data.contentSubtype,
331	}
332	if frame.StreamEnded() {
333		// s is just created by the caller. No lock needed.
334		s.state = streamReadDone
335	}
336	if state.data.timeoutSet {
337		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
338	} else {
339		s.ctx, s.cancel = context.WithCancel(t.ctx)
340	}
341	pr := &peer.Peer{
342		Addr: t.remoteAddr,
343	}
344	// Attach Auth info if there is any.
345	if t.authInfo != nil {
346		pr.AuthInfo = t.authInfo
347	}
348	s.ctx = peer.NewContext(s.ctx, pr)
349	// Attach the received metadata to the context.
350	if len(state.data.mdata) > 0 {
351		s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
352	}
353	if state.data.statsTags != nil {
354		s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
355	}
356	if state.data.statsTrace != nil {
357		s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
358	}
359	t.mu.Lock()
360	if t.state != reachable {
361		t.mu.Unlock()
362		s.cancel()
363		return false
364	}
365	if uint32(len(t.activeStreams)) >= t.maxStreams {
366		t.mu.Unlock()
367		t.controlBuf.put(&cleanupStream{
368			streamID: streamID,
369			rst:      true,
370			rstCode:  http2.ErrCodeRefusedStream,
371			onWrite:  func() {},
372		})
373		s.cancel()
374		return false
375	}
376	if streamID%2 != 1 || streamID <= t.maxStreamID {
377		t.mu.Unlock()
378		// illegal gRPC stream id.
379		if logger.V(logLevel) {
380			logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
381		}
382		s.cancel()
383		return true
384	}
385	t.maxStreamID = streamID
386	if state.data.httpMethod != http.MethodPost {
387		t.mu.Unlock()
388		if logger.V(logLevel) {
389			logger.Warningf("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", state.data.httpMethod)
390		}
391		t.controlBuf.put(&cleanupStream{
392			streamID: streamID,
393			rst:      true,
394			rstCode:  http2.ErrCodeProtocol,
395			onWrite:  func() {},
396		})
397		s.cancel()
398		return false
399	}
400	if t.inTapHandle != nil {
401		var err error
402		if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: state.data.method}); err != nil {
403			t.mu.Unlock()
404			if logger.V(logLevel) {
405				logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
406			}
407			stat, ok := status.FromError(err)
408			if !ok {
409				stat = status.New(codes.PermissionDenied, err.Error())
410			}
411			t.controlBuf.put(&earlyAbortStream{
412				streamID:       s.id,
413				contentSubtype: s.contentSubtype,
414				status:         stat,
415			})
416			return false
417		}
418	}
419	t.activeStreams[streamID] = s
420	if len(t.activeStreams) == 1 {
421		t.idle = time.Time{}
422	}
423	t.mu.Unlock()
424	if channelz.IsOn() {
425		atomic.AddInt64(&t.czData.streamsStarted, 1)
426		atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
427	}
428	s.requestRead = func(n int) {
429		t.adjustWindow(s, uint32(n))
430	}
431	s.ctx = traceCtx(s.ctx, s.method)
432	if t.stats != nil {
433		s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
434		inHeader := &stats.InHeader{
435			FullMethod:  s.method,
436			RemoteAddr:  t.remoteAddr,
437			LocalAddr:   t.localAddr,
438			Compression: s.recvCompress,
439			WireLength:  int(frame.Header().Length),
440			Header:      metadata.MD(state.data.mdata).Copy(),
441		}
442		t.stats.HandleRPC(s.ctx, inHeader)
443	}
444	s.ctxDone = s.ctx.Done()
445	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
446	s.trReader = &transportReader{
447		reader: &recvBufferReader{
448			ctx:        s.ctx,
449			ctxDone:    s.ctxDone,
450			recv:       s.buf,
451			freeBuffer: t.bufferPool.put,
452		},
453		windowHandler: func(n int) {
454			t.updateWindow(s, uint32(n))
455		},
456	}
457	// Register the stream with loopy.
458	t.controlBuf.put(&registerStream{
459		streamID: s.id,
460		wq:       s.wq,
461	})
462	handle(s)
463	return false
464}
465
466// HandleStreams receives incoming streams using the given handler. This is
467// typically run in a separate goroutine.
468// traceCtx attaches trace to ctx and returns the new context.
469func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
470	defer close(t.readerDone)
471	for {
472		t.controlBuf.throttle()
473		frame, err := t.framer.fr.ReadFrame()
474		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
475		if err != nil {
476			if se, ok := err.(http2.StreamError); ok {
477				if logger.V(logLevel) {
478					logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
479				}
480				t.mu.Lock()
481				s := t.activeStreams[se.StreamID]
482				t.mu.Unlock()
483				if s != nil {
484					t.closeStream(s, true, se.Code, false)
485				} else {
486					t.controlBuf.put(&cleanupStream{
487						streamID: se.StreamID,
488						rst:      true,
489						rstCode:  se.Code,
490						onWrite:  func() {},
491					})
492				}
493				continue
494			}
495			if err == io.EOF || err == io.ErrUnexpectedEOF {
496				t.Close()
497				return
498			}
499			if logger.V(logLevel) {
500				logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
501			}
502			t.Close()
503			return
504		}
505		switch frame := frame.(type) {
506		case *http2.MetaHeadersFrame:
507			if t.operateHeaders(frame, handle, traceCtx) {
508				t.Close()
509				break
510			}
511		case *http2.DataFrame:
512			t.handleData(frame)
513		case *http2.RSTStreamFrame:
514			t.handleRSTStream(frame)
515		case *http2.SettingsFrame:
516			t.handleSettings(frame)
517		case *http2.PingFrame:
518			t.handlePing(frame)
519		case *http2.WindowUpdateFrame:
520			t.handleWindowUpdate(frame)
521		case *http2.GoAwayFrame:
522			// TODO: Handle GoAway from the client appropriately.
523		default:
524			if logger.V(logLevel) {
525				logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
526			}
527		}
528	}
529}
530
531func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
532	t.mu.Lock()
533	defer t.mu.Unlock()
534	if t.activeStreams == nil {
535		// The transport is closing.
536		return nil, false
537	}
538	s, ok := t.activeStreams[f.Header().StreamID]
539	if !ok {
540		// The stream is already done.
541		return nil, false
542	}
543	return s, true
544}
545
546// adjustWindow sends out extra window update over the initial window size
547// of stream if the application is requesting data larger in size than
548// the window.
549func (t *http2Server) adjustWindow(s *Stream, n uint32) {
550	if w := s.fc.maybeAdjust(n); w > 0 {
551		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
552	}
553
554}
555
556// updateWindow adjusts the inbound quota for the stream and the transport.
557// Window updates will deliver to the controller for sending when
558// the cumulative quota exceeds the corresponding threshold.
559func (t *http2Server) updateWindow(s *Stream, n uint32) {
560	if w := s.fc.onRead(n); w > 0 {
561		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
562			increment: w,
563		})
564	}
565}
566
567// updateFlowControl updates the incoming flow control windows
568// for the transport and the stream based on the current bdp
569// estimation.
570func (t *http2Server) updateFlowControl(n uint32) {
571	t.mu.Lock()
572	for _, s := range t.activeStreams {
573		s.fc.newLimit(n)
574	}
575	t.initialWindowSize = int32(n)
576	t.mu.Unlock()
577	t.controlBuf.put(&outgoingWindowUpdate{
578		streamID:  0,
579		increment: t.fc.newLimit(n),
580	})
581	t.controlBuf.put(&outgoingSettings{
582		ss: []http2.Setting{
583			{
584				ID:  http2.SettingInitialWindowSize,
585				Val: n,
586			},
587		},
588	})
589
590}
591
592func (t *http2Server) handleData(f *http2.DataFrame) {
593	size := f.Header().Length
594	var sendBDPPing bool
595	if t.bdpEst != nil {
596		sendBDPPing = t.bdpEst.add(size)
597	}
598	// Decouple connection's flow control from application's read.
599	// An update on connection's flow control should not depend on
600	// whether user application has read the data or not. Such a
601	// restriction is already imposed on the stream's flow control,
602	// and therefore the sender will be blocked anyways.
603	// Decoupling the connection flow control will prevent other
604	// active(fast) streams from starving in presence of slow or
605	// inactive streams.
606	if w := t.fc.onData(size); w > 0 {
607		t.controlBuf.put(&outgoingWindowUpdate{
608			streamID:  0,
609			increment: w,
610		})
611	}
612	if sendBDPPing {
613		// Avoid excessive ping detection (e.g. in an L7 proxy)
614		// by sending a window update prior to the BDP ping.
615		if w := t.fc.reset(); w > 0 {
616			t.controlBuf.put(&outgoingWindowUpdate{
617				streamID:  0,
618				increment: w,
619			})
620		}
621		t.controlBuf.put(bdpPing)
622	}
623	// Select the right stream to dispatch.
624	s, ok := t.getStream(f)
625	if !ok {
626		return
627	}
628	if s.getState() == streamReadDone {
629		t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
630		return
631	}
632	if size > 0 {
633		if err := s.fc.onData(size); err != nil {
634			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
635			return
636		}
637		if f.Header().Flags.Has(http2.FlagDataPadded) {
638			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
639				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
640			}
641		}
642		// TODO(bradfitz, zhaoq): A copy is required here because there is no
643		// guarantee f.Data() is consumed before the arrival of next frame.
644		// Can this copy be eliminated?
645		if len(f.Data()) > 0 {
646			buffer := t.bufferPool.get()
647			buffer.Reset()
648			buffer.Write(f.Data())
649			s.write(recvMsg{buffer: buffer})
650		}
651	}
652	if f.Header().Flags.Has(http2.FlagDataEndStream) {
653		// Received the end of stream from the client.
654		s.compareAndSwapState(streamActive, streamReadDone)
655		s.write(recvMsg{err: io.EOF})
656	}
657}
658
659func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
660	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
661	if s, ok := t.getStream(f); ok {
662		t.closeStream(s, false, 0, false)
663		return
664	}
665	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
666	t.controlBuf.put(&cleanupStream{
667		streamID: f.Header().StreamID,
668		rst:      false,
669		rstCode:  0,
670		onWrite:  func() {},
671	})
672}
673
674func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
675	if f.IsAck() {
676		return
677	}
678	var ss []http2.Setting
679	var updateFuncs []func()
680	f.ForeachSetting(func(s http2.Setting) error {
681		switch s.ID {
682		case http2.SettingMaxHeaderListSize:
683			updateFuncs = append(updateFuncs, func() {
684				t.maxSendHeaderListSize = new(uint32)
685				*t.maxSendHeaderListSize = s.Val
686			})
687		default:
688			ss = append(ss, s)
689		}
690		return nil
691	})
692	t.controlBuf.executeAndPut(func(interface{}) bool {
693		for _, f := range updateFuncs {
694			f()
695		}
696		return true
697	}, &incomingSettings{
698		ss: ss,
699	})
700}
701
702const (
703	maxPingStrikes     = 2
704	defaultPingTimeout = 2 * time.Hour
705)
706
707func (t *http2Server) handlePing(f *http2.PingFrame) {
708	if f.IsAck() {
709		if f.Data == goAwayPing.data && t.drainChan != nil {
710			close(t.drainChan)
711			return
712		}
713		// Maybe it's a BDP ping.
714		if t.bdpEst != nil {
715			t.bdpEst.calculate(f.Data)
716		}
717		return
718	}
719	pingAck := &ping{ack: true}
720	copy(pingAck.data[:], f.Data[:])
721	t.controlBuf.put(pingAck)
722
723	now := time.Now()
724	defer func() {
725		t.lastPingAt = now
726	}()
727	// A reset ping strikes means that we don't need to check for policy
728	// violation for this ping and the pingStrikes counter should be set
729	// to 0.
730	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
731		t.pingStrikes = 0
732		return
733	}
734	t.mu.Lock()
735	ns := len(t.activeStreams)
736	t.mu.Unlock()
737	if ns < 1 && !t.kep.PermitWithoutStream {
738		// Keepalive shouldn't be active thus, this new ping should
739		// have come after at least defaultPingTimeout.
740		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
741			t.pingStrikes++
742		}
743	} else {
744		// Check if keepalive policy is respected.
745		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
746			t.pingStrikes++
747		}
748	}
749
750	if t.pingStrikes > maxPingStrikes {
751		// Send goaway and close the connection.
752		if logger.V(logLevel) {
753			logger.Errorf("transport: Got too many pings from the client, closing the connection.")
754		}
755		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
756	}
757}
758
759func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
760	t.controlBuf.put(&incomingWindowUpdate{
761		streamID:  f.Header().StreamID,
762		increment: f.Increment,
763	})
764}
765
766func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
767	for k, vv := range md {
768		if isReservedHeader(k) {
769			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
770			continue
771		}
772		for _, v := range vv {
773			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
774		}
775	}
776	return headerFields
777}
778
779func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
780	if t.maxSendHeaderListSize == nil {
781		return true
782	}
783	hdrFrame := it.(*headerFrame)
784	var sz int64
785	for _, f := range hdrFrame.hf {
786		if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
787			if logger.V(logLevel) {
788				logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
789			}
790			return false
791		}
792	}
793	return true
794}
795
796// WriteHeader sends the header metadata md back to the client.
797func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
798	if s.updateHeaderSent() || s.getState() == streamDone {
799		return ErrIllegalHeaderWrite
800	}
801	s.hdrMu.Lock()
802	if md.Len() > 0 {
803		if s.header.Len() > 0 {
804			s.header = metadata.Join(s.header, md)
805		} else {
806			s.header = md
807		}
808	}
809	if err := t.writeHeaderLocked(s); err != nil {
810		s.hdrMu.Unlock()
811		return err
812	}
813	s.hdrMu.Unlock()
814	return nil
815}
816
817func (t *http2Server) setResetPingStrikes() {
818	atomic.StoreUint32(&t.resetPingStrikes, 1)
819}
820
821func (t *http2Server) writeHeaderLocked(s *Stream) error {
822	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
823	// first and create a slice of that exact size.
824	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
825	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
826	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
827	if s.sendCompress != "" {
828		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
829	}
830	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
831	success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
832		streamID:  s.id,
833		hf:        headerFields,
834		endStream: false,
835		onWrite:   t.setResetPingStrikes,
836	})
837	if !success {
838		if err != nil {
839			return err
840		}
841		t.closeStream(s, true, http2.ErrCodeInternal, false)
842		return ErrHeaderListSizeLimitViolation
843	}
844	if t.stats != nil {
845		// Note: Headers are compressed with hpack after this call returns.
846		// No WireLength field is set here.
847		outHeader := &stats.OutHeader{
848			Header:      s.header.Copy(),
849			Compression: s.sendCompress,
850		}
851		t.stats.HandleRPC(s.Context(), outHeader)
852	}
853	return nil
854}
855
856// WriteStatus sends stream status to the client and terminates the stream.
857// There is no further I/O operations being able to perform on this stream.
858// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
859// OK is adopted.
860func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
861	if s.getState() == streamDone {
862		return nil
863	}
864	s.hdrMu.Lock()
865	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
866	// first and create a slice of that exact size.
867	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
868	if !s.updateHeaderSent() {                      // No headers have been sent.
869		if len(s.header) > 0 { // Send a separate header frame.
870			if err := t.writeHeaderLocked(s); err != nil {
871				s.hdrMu.Unlock()
872				return err
873			}
874		} else { // Send a trailer only response.
875			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
876			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
877		}
878	}
879	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
880	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
881
882	if p := st.Proto(); p != nil && len(p.Details) > 0 {
883		stBytes, err := proto.Marshal(p)
884		if err != nil {
885			// TODO: return error instead, when callers are able to handle it.
886			logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
887		} else {
888			headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
889		}
890	}
891
892	// Attach the trailer metadata.
893	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
894	trailingHeader := &headerFrame{
895		streamID:  s.id,
896		hf:        headerFields,
897		endStream: true,
898		onWrite:   t.setResetPingStrikes,
899	}
900	s.hdrMu.Unlock()
901	success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
902	if !success {
903		if err != nil {
904			return err
905		}
906		t.closeStream(s, true, http2.ErrCodeInternal, false)
907		return ErrHeaderListSizeLimitViolation
908	}
909	// Send a RST_STREAM after the trailers if the client has not already half-closed.
910	rst := s.getState() == streamActive
911	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
912	if t.stats != nil {
913		// Note: The trailer fields are compressed with hpack after this call returns.
914		// No WireLength field is set here.
915		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
916			Trailer: s.trailer.Copy(),
917		})
918	}
919	return nil
920}
921
922// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
923// is returns if it fails (e.g., framing error, transport error).
924func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
925	if !s.isHeaderSent() { // Headers haven't been written yet.
926		if err := t.WriteHeader(s, nil); err != nil {
927			if _, ok := err.(ConnectionError); ok {
928				return err
929			}
930			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
931			return status.Errorf(codes.Internal, "transport: %v", err)
932		}
933	} else {
934		// Writing headers checks for this condition.
935		if s.getState() == streamDone {
936			// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
937			s.cancel()
938			select {
939			case <-t.done:
940				return ErrConnClosing
941			default:
942			}
943			return ContextErr(s.ctx.Err())
944		}
945	}
946	df := &dataFrame{
947		streamID:    s.id,
948		h:           hdr,
949		d:           data,
950		onEachWrite: t.setResetPingStrikes,
951	}
952	if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
953		select {
954		case <-t.done:
955			return ErrConnClosing
956		default:
957		}
958		return ContextErr(s.ctx.Err())
959	}
960	return t.controlBuf.put(df)
961}
962
963// keepalive running in a separate goroutine does the following:
964// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
965// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
966// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
967// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
968// after an additional duration of keepalive.Timeout.
969func (t *http2Server) keepalive() {
970	p := &ping{}
971	// True iff a ping has been sent, and no data has been received since then.
972	outstandingPing := false
973	// Amount of time remaining before which we should receive an ACK for the
974	// last sent ping.
975	kpTimeoutLeft := time.Duration(0)
976	// Records the last value of t.lastRead before we go block on the timer.
977	// This is required to check for read activity since then.
978	prevNano := time.Now().UnixNano()
979	// Initialize the different timers to their default values.
980	idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
981	ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
982	kpTimer := time.NewTimer(t.kp.Time)
983	defer func() {
984		// We need to drain the underlying channel in these timers after a call
985		// to Stop(), only if we are interested in resetting them. Clearly we
986		// are not interested in resetting them here.
987		idleTimer.Stop()
988		ageTimer.Stop()
989		kpTimer.Stop()
990	}()
991
992	for {
993		select {
994		case <-idleTimer.C:
995			t.mu.Lock()
996			idle := t.idle
997			if idle.IsZero() { // The connection is non-idle.
998				t.mu.Unlock()
999				idleTimer.Reset(t.kp.MaxConnectionIdle)
1000				continue
1001			}
1002			val := t.kp.MaxConnectionIdle - time.Since(idle)
1003			t.mu.Unlock()
1004			if val <= 0 {
1005				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
1006				// Gracefully close the connection.
1007				t.drain(http2.ErrCodeNo, []byte{})
1008				return
1009			}
1010			idleTimer.Reset(val)
1011		case <-ageTimer.C:
1012			t.drain(http2.ErrCodeNo, []byte{})
1013			ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
1014			select {
1015			case <-ageTimer.C:
1016				// Close the connection after grace period.
1017				if logger.V(logLevel) {
1018					logger.Infof("transport: closing server transport due to maximum connection age.")
1019				}
1020				t.Close()
1021			case <-t.done:
1022			}
1023			return
1024		case <-kpTimer.C:
1025			lastRead := atomic.LoadInt64(&t.lastRead)
1026			if lastRead > prevNano {
1027				// There has been read activity since the last time we were
1028				// here. Setup the timer to fire at kp.Time seconds from
1029				// lastRead time and continue.
1030				outstandingPing = false
1031				kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1032				prevNano = lastRead
1033				continue
1034			}
1035			if outstandingPing && kpTimeoutLeft <= 0 {
1036				if logger.V(logLevel) {
1037					logger.Infof("transport: closing server transport due to idleness.")
1038				}
1039				t.Close()
1040				return
1041			}
1042			if !outstandingPing {
1043				if channelz.IsOn() {
1044					atomic.AddInt64(&t.czData.kpCount, 1)
1045				}
1046				t.controlBuf.put(p)
1047				kpTimeoutLeft = t.kp.Timeout
1048				outstandingPing = true
1049			}
1050			// The amount of time to sleep here is the minimum of kp.Time and
1051			// timeoutLeft. This will ensure that we wait only for kp.Time
1052			// before sending out the next ping (for cases where the ping is
1053			// acked).
1054			sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
1055			kpTimeoutLeft -= sleepDuration
1056			kpTimer.Reset(sleepDuration)
1057		case <-t.done:
1058			return
1059		}
1060	}
1061}
1062
1063// Close starts shutting down the http2Server transport.
1064// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1065// could cause some resource issue. Revisit this later.
1066func (t *http2Server) Close() error {
1067	t.mu.Lock()
1068	if t.state == closing {
1069		t.mu.Unlock()
1070		return errors.New("transport: Close() was already called")
1071	}
1072	t.state = closing
1073	streams := t.activeStreams
1074	t.activeStreams = nil
1075	t.mu.Unlock()
1076	t.controlBuf.finish()
1077	close(t.done)
1078	err := t.conn.Close()
1079	if channelz.IsOn() {
1080		channelz.RemoveEntry(t.channelzID)
1081	}
1082	// Cancel all active streams.
1083	for _, s := range streams {
1084		s.cancel()
1085	}
1086	if t.stats != nil {
1087		connEnd := &stats.ConnEnd{}
1088		t.stats.HandleConn(t.ctx, connEnd)
1089	}
1090	return err
1091}
1092
1093// deleteStream deletes the stream s from transport's active streams.
1094func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1095	// In case stream sending and receiving are invoked in separate
1096	// goroutines (e.g., bi-directional streaming), cancel needs to be
1097	// called to interrupt the potential blocking on other goroutines.
1098	s.cancel()
1099
1100	t.mu.Lock()
1101	if _, ok := t.activeStreams[s.id]; ok {
1102		delete(t.activeStreams, s.id)
1103		if len(t.activeStreams) == 0 {
1104			t.idle = time.Now()
1105		}
1106	}
1107	t.mu.Unlock()
1108
1109	if channelz.IsOn() {
1110		if eosReceived {
1111			atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1112		} else {
1113			atomic.AddInt64(&t.czData.streamsFailed, 1)
1114		}
1115	}
1116}
1117
1118// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1119func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1120	oldState := s.swapState(streamDone)
1121	if oldState == streamDone {
1122		// If the stream was already done, return.
1123		return
1124	}
1125
1126	hdr.cleanup = &cleanupStream{
1127		streamID: s.id,
1128		rst:      rst,
1129		rstCode:  rstCode,
1130		onWrite: func() {
1131			t.deleteStream(s, eosReceived)
1132		},
1133	}
1134	t.controlBuf.put(hdr)
1135}
1136
1137// closeStream clears the footprint of a stream when the stream is not needed any more.
1138func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1139	s.swapState(streamDone)
1140	t.deleteStream(s, eosReceived)
1141
1142	t.controlBuf.put(&cleanupStream{
1143		streamID: s.id,
1144		rst:      rst,
1145		rstCode:  rstCode,
1146		onWrite:  func() {},
1147	})
1148}
1149
1150func (t *http2Server) RemoteAddr() net.Addr {
1151	return t.remoteAddr
1152}
1153
1154func (t *http2Server) Drain() {
1155	t.drain(http2.ErrCodeNo, []byte{})
1156}
1157
1158func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1159	t.mu.Lock()
1160	defer t.mu.Unlock()
1161	if t.drainChan != nil {
1162		return
1163	}
1164	t.drainChan = make(chan struct{})
1165	t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1166}
1167
1168var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1169
1170// Handles outgoing GoAway and returns true if loopy needs to put itself
1171// in draining mode.
1172func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1173	t.mu.Lock()
1174	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1175		t.mu.Unlock()
1176		// The transport is closing.
1177		return false, ErrConnClosing
1178	}
1179	sid := t.maxStreamID
1180	if !g.headsUp {
1181		// Stop accepting more streams now.
1182		t.state = draining
1183		if len(t.activeStreams) == 0 {
1184			g.closeConn = true
1185		}
1186		t.mu.Unlock()
1187		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1188			return false, err
1189		}
1190		if g.closeConn {
1191			// Abruptly close the connection following the GoAway (via
1192			// loopywriter).  But flush out what's inside the buffer first.
1193			t.framer.writer.Flush()
1194			return false, fmt.Errorf("transport: Connection closing")
1195		}
1196		return true, nil
1197	}
1198	t.mu.Unlock()
1199	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1200	// Follow that with a ping and wait for the ack to come back or a timer
1201	// to expire. During this time accept new streams since they might have
1202	// originated before the GoAway reaches the client.
1203	// After getting the ack or timer expiration send out another GoAway this
1204	// time with an ID of the max stream server intends to process.
1205	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1206		return false, err
1207	}
1208	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1209		return false, err
1210	}
1211	go func() {
1212		timer := time.NewTimer(time.Minute)
1213		defer timer.Stop()
1214		select {
1215		case <-t.drainChan:
1216		case <-timer.C:
1217		case <-t.done:
1218			return
1219		}
1220		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1221	}()
1222	return false, nil
1223}
1224
1225func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1226	s := channelz.SocketInternalMetric{
1227		StreamsStarted:                   atomic.LoadInt64(&t.czData.streamsStarted),
1228		StreamsSucceeded:                 atomic.LoadInt64(&t.czData.streamsSucceeded),
1229		StreamsFailed:                    atomic.LoadInt64(&t.czData.streamsFailed),
1230		MessagesSent:                     atomic.LoadInt64(&t.czData.msgSent),
1231		MessagesReceived:                 atomic.LoadInt64(&t.czData.msgRecv),
1232		KeepAlivesSent:                   atomic.LoadInt64(&t.czData.kpCount),
1233		LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1234		LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1235		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1236		LocalFlowControlWindow:           int64(t.fc.getSize()),
1237		SocketOptions:                    channelz.GetSocketOption(t.conn),
1238		LocalAddr:                        t.localAddr,
1239		RemoteAddr:                       t.remoteAddr,
1240		// RemoteName :
1241	}
1242	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1243		s.Security = au.GetSecurityValue()
1244	}
1245	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1246	return &s
1247}
1248
1249func (t *http2Server) IncrMsgSent() {
1250	atomic.AddInt64(&t.czData.msgSent, 1)
1251	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1252}
1253
1254func (t *http2Server) IncrMsgRecv() {
1255	atomic.AddInt64(&t.czData.msgRecv, 1)
1256	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1257}
1258
1259func (t *http2Server) getOutFlowWindow() int64 {
1260	resp := make(chan uint32, 1)
1261	timer := time.NewTimer(time.Second)
1262	defer timer.Stop()
1263	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1264	select {
1265	case sz := <-resp:
1266		return int64(sz)
1267	case <-t.done:
1268		return -1
1269	case <-timer.C:
1270		return -2
1271	}
1272}
1273
1274func getJitter(v time.Duration) time.Duration {
1275	if v == infinity {
1276		return 0
1277	}
1278	// Generate a jitter between +/- 10% of the value.
1279	r := int64(v / 10)
1280	j := grpcrand.Int63n(2*r) - r
1281	return time.Duration(j)
1282}
1283