1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"errors"
24	"io"
25	"math"
26	"math/rand"
27	"net"
28	"strconv"
29	"sync"
30	"sync/atomic"
31	"time"
32
33	"github.com/golang/protobuf/proto"
34	"golang.org/x/net/context"
35	"golang.org/x/net/http2"
36	"golang.org/x/net/http2/hpack"
37	"google.golang.org/grpc/codes"
38	"google.golang.org/grpc/credentials"
39	"google.golang.org/grpc/keepalive"
40	"google.golang.org/grpc/metadata"
41	"google.golang.org/grpc/peer"
42	"google.golang.org/grpc/stats"
43	"google.golang.org/grpc/status"
44	"google.golang.org/grpc/tap"
45)
46
47// ErrIllegalHeaderWrite indicates that setting header is illegal because of
48// the stream's state.
49var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
50
51// http2Server implements the ServerTransport interface with HTTP2.
52type http2Server struct {
53	ctx         context.Context
54	conn        net.Conn
55	remoteAddr  net.Addr
56	localAddr   net.Addr
57	maxStreamID uint32               // max stream ID ever seen
58	authInfo    credentials.AuthInfo // auth info about the connection
59	inTapHandle tap.ServerInHandle
60	// writableChan synchronizes write access to the transport.
61	// A writer acquires the write lock by receiving a value on writableChan
62	// and releases it by sending on writableChan.
63	writableChan chan int
64	// shutdownChan is closed when Close is called.
65	// Blocking operations should select on shutdownChan to avoid
66	// blocking forever after Close.
67	shutdownChan chan struct{}
68	framer       *framer
69	hBuf         *bytes.Buffer  // the buffer for HPACK encoding
70	hEnc         *hpack.Encoder // HPACK encoder
71	// The max number of concurrent streams.
72	maxStreams uint32
73	// controlBuf delivers all the control related tasks (e.g., window
74	// updates, reset streams, and various settings) to the controller.
75	controlBuf *controlBuffer
76	fc         *inFlow
77	// sendQuotaPool provides flow control to outbound message.
78	sendQuotaPool *quotaPool
79	stats         stats.Handler
80	// Flag to keep track of reading activity on transport.
81	// 1 is true and 0 is false.
82	activity uint32 // Accessed atomically.
83	// Keepalive and max-age parameters for the server.
84	kp keepalive.ServerParameters
85
86	// Keepalive enforcement policy.
87	kep keepalive.EnforcementPolicy
88	// The time instance last ping was received.
89	lastPingAt time.Time
90	// Number of times the client has violated keepalive ping policy so far.
91	pingStrikes uint8
92	// Flag to signify that number of ping strikes should be reset to 0.
93	// This is set whenever data or header frames are sent.
94	// 1 means yes.
95	resetPingStrikes  uint32 // Accessed atomically.
96	initialWindowSize int32
97	bdpEst            *bdpEstimator
98
99	outQuotaVersion uint32
100
101	mu sync.Mutex // guard the following
102
103	// drainChan is initialized when drain(...) is called the first time.
104	// After which the server writes out the first GoAway(with ID 2^31-1) frame.
105	// Then an independent goroutine will be launched to later send the second GoAway.
106	// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
107	// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
108	// already underway.
109	drainChan     chan struct{}
110	state         transportState
111	activeStreams map[uint32]*Stream
112	// the per-stream outbound flow control window size set by the peer.
113	streamSendQuota uint32
114	// idle is the time instant when the connection went idle.
115	// This is either the begining of the connection or when the number of
116	// RPCs go down to 0.
117	// When the connection is busy, this value is set to 0.
118	idle time.Time
119}
120
121// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
122// returned if something goes wrong.
123func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
124	framer := newFramer(conn)
125	// Send initial settings as connection preface to client.
126	var isettings []http2.Setting
127	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
128	// permitted in the HTTP2 spec.
129	maxStreams := config.MaxStreams
130	if maxStreams == 0 {
131		maxStreams = math.MaxUint32
132	} else {
133		isettings = append(isettings, http2.Setting{
134			ID:  http2.SettingMaxConcurrentStreams,
135			Val: maxStreams,
136		})
137	}
138	dynamicWindow := true
139	iwz := int32(initialWindowSize)
140	if config.InitialWindowSize >= defaultWindowSize {
141		iwz = config.InitialWindowSize
142		dynamicWindow = false
143	}
144	icwz := int32(initialWindowSize)
145	if config.InitialConnWindowSize >= defaultWindowSize {
146		icwz = config.InitialConnWindowSize
147		dynamicWindow = false
148	}
149	if iwz != defaultWindowSize {
150		isettings = append(isettings, http2.Setting{
151			ID:  http2.SettingInitialWindowSize,
152			Val: uint32(iwz)})
153	}
154	if err := framer.writeSettings(true, isettings...); err != nil {
155		return nil, connectionErrorf(true, err, "transport: %v", err)
156	}
157	// Adjust the connection flow control window if needed.
158	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
159		if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
160			return nil, connectionErrorf(true, err, "transport: %v", err)
161		}
162	}
163	kp := config.KeepaliveParams
164	if kp.MaxConnectionIdle == 0 {
165		kp.MaxConnectionIdle = defaultMaxConnectionIdle
166	}
167	if kp.MaxConnectionAge == 0 {
168		kp.MaxConnectionAge = defaultMaxConnectionAge
169	}
170	// Add a jitter to MaxConnectionAge.
171	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
172	if kp.MaxConnectionAgeGrace == 0 {
173		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
174	}
175	if kp.Time == 0 {
176		kp.Time = defaultServerKeepaliveTime
177	}
178	if kp.Timeout == 0 {
179		kp.Timeout = defaultServerKeepaliveTimeout
180	}
181	kep := config.KeepalivePolicy
182	if kep.MinTime == 0 {
183		kep.MinTime = defaultKeepalivePolicyMinTime
184	}
185	var buf bytes.Buffer
186	t := &http2Server{
187		ctx:               context.Background(),
188		conn:              conn,
189		remoteAddr:        conn.RemoteAddr(),
190		localAddr:         conn.LocalAddr(),
191		authInfo:          config.AuthInfo,
192		framer:            framer,
193		hBuf:              &buf,
194		hEnc:              hpack.NewEncoder(&buf),
195		maxStreams:        maxStreams,
196		inTapHandle:       config.InTapHandle,
197		controlBuf:        newControlBuffer(),
198		fc:                &inFlow{limit: uint32(icwz)},
199		sendQuotaPool:     newQuotaPool(defaultWindowSize),
200		state:             reachable,
201		writableChan:      make(chan int, 1),
202		shutdownChan:      make(chan struct{}),
203		activeStreams:     make(map[uint32]*Stream),
204		streamSendQuota:   defaultWindowSize,
205		stats:             config.StatsHandler,
206		kp:                kp,
207		idle:              time.Now(),
208		kep:               kep,
209		initialWindowSize: iwz,
210	}
211	if dynamicWindow {
212		t.bdpEst = &bdpEstimator{
213			bdp:               initialWindowSize,
214			updateFlowControl: t.updateFlowControl,
215		}
216	}
217	if t.stats != nil {
218		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
219			RemoteAddr: t.remoteAddr,
220			LocalAddr:  t.localAddr,
221		})
222		connBegin := &stats.ConnBegin{}
223		t.stats.HandleConn(t.ctx, connBegin)
224	}
225	go t.controller()
226	go t.keepalive()
227	t.writableChan <- 0
228	return t, nil
229}
230
231// operateHeader takes action on the decoded headers.
232func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
233	streamID := frame.Header().StreamID
234
235	var state decodeState
236	for _, hf := range frame.Fields {
237		if err := state.processHeaderField(hf); err != nil {
238			if se, ok := err.(StreamError); ok {
239				t.controlBuf.put(&resetStream{streamID, statusCodeConvTab[se.Code]})
240			}
241			return
242		}
243	}
244
245	buf := newRecvBuffer()
246	s := &Stream{
247		id:           streamID,
248		st:           t,
249		buf:          buf,
250		fc:           &inFlow{limit: uint32(t.initialWindowSize)},
251		recvCompress: state.encoding,
252		method:       state.method,
253	}
254
255	if frame.StreamEnded() {
256		// s is just created by the caller. No lock needed.
257		s.state = streamReadDone
258	}
259	if state.timeoutSet {
260		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
261	} else {
262		s.ctx, s.cancel = context.WithCancel(t.ctx)
263	}
264	pr := &peer.Peer{
265		Addr: t.remoteAddr,
266	}
267	// Attach Auth info if there is any.
268	if t.authInfo != nil {
269		pr.AuthInfo = t.authInfo
270	}
271	s.ctx = peer.NewContext(s.ctx, pr)
272	// Cache the current stream to the context so that the server application
273	// can find out. Required when the server wants to send some metadata
274	// back to the client (unary call only).
275	s.ctx = newContextWithStream(s.ctx, s)
276	// Attach the received metadata to the context.
277	if len(state.mdata) > 0 {
278		s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
279	}
280	if state.statsTags != nil {
281		s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
282	}
283	if state.statsTrace != nil {
284		s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
285	}
286	if t.inTapHandle != nil {
287		var err error
288		info := &tap.Info{
289			FullMethodName: state.method,
290		}
291		s.ctx, err = t.inTapHandle(s.ctx, info)
292		if err != nil {
293			warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
294			t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
295			return
296		}
297	}
298	t.mu.Lock()
299	if t.state != reachable {
300		t.mu.Unlock()
301		return
302	}
303	if uint32(len(t.activeStreams)) >= t.maxStreams {
304		t.mu.Unlock()
305		t.controlBuf.put(&resetStream{streamID, http2.ErrCodeRefusedStream})
306		return
307	}
308	if streamID%2 != 1 || streamID <= t.maxStreamID {
309		t.mu.Unlock()
310		// illegal gRPC stream id.
311		errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
312		return true
313	}
314	t.maxStreamID = streamID
315	s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
316	t.activeStreams[streamID] = s
317	if len(t.activeStreams) == 1 {
318		t.idle = time.Time{}
319	}
320	t.mu.Unlock()
321	s.requestRead = func(n int) {
322		t.adjustWindow(s, uint32(n))
323	}
324	s.ctx = traceCtx(s.ctx, s.method)
325	if t.stats != nil {
326		s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
327		inHeader := &stats.InHeader{
328			FullMethod:  s.method,
329			RemoteAddr:  t.remoteAddr,
330			LocalAddr:   t.localAddr,
331			Compression: s.recvCompress,
332			WireLength:  int(frame.Header().Length),
333		}
334		t.stats.HandleRPC(s.ctx, inHeader)
335	}
336	s.trReader = &transportReader{
337		reader: &recvBufferReader{
338			ctx:  s.ctx,
339			recv: s.buf,
340		},
341		windowHandler: func(n int) {
342			t.updateWindow(s, uint32(n))
343		},
344	}
345	handle(s)
346	return
347}
348
349// HandleStreams receives incoming streams using the given handler. This is
350// typically run in a separate goroutine.
351// traceCtx attaches trace to ctx and returns the new context.
352func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
353	// Check the validity of client preface.
354	preface := make([]byte, len(clientPreface))
355	if _, err := io.ReadFull(t.conn, preface); err != nil {
356		// Only log if it isn't a simple tcp accept check (ie: tcp balancer doing open/close socket)
357		if err != io.EOF {
358			errorf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
359		}
360		t.Close()
361		return
362	}
363	if !bytes.Equal(preface, clientPreface) {
364		errorf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
365		t.Close()
366		return
367	}
368
369	frame, err := t.framer.readFrame()
370	if err == io.EOF || err == io.ErrUnexpectedEOF {
371		t.Close()
372		return
373	}
374	if err != nil {
375		errorf("transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
376		t.Close()
377		return
378	}
379	atomic.StoreUint32(&t.activity, 1)
380	sf, ok := frame.(*http2.SettingsFrame)
381	if !ok {
382		errorf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
383		t.Close()
384		return
385	}
386	t.handleSettings(sf)
387
388	for {
389		frame, err := t.framer.readFrame()
390		atomic.StoreUint32(&t.activity, 1)
391		if err != nil {
392			if se, ok := err.(http2.StreamError); ok {
393				t.mu.Lock()
394				s := t.activeStreams[se.StreamID]
395				t.mu.Unlock()
396				if s != nil {
397					t.closeStream(s)
398				}
399				t.controlBuf.put(&resetStream{se.StreamID, se.Code})
400				continue
401			}
402			if err == io.EOF || err == io.ErrUnexpectedEOF {
403				t.Close()
404				return
405			}
406			warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
407			t.Close()
408			return
409		}
410		switch frame := frame.(type) {
411		case *http2.MetaHeadersFrame:
412			if t.operateHeaders(frame, handle, traceCtx) {
413				t.Close()
414				break
415			}
416		case *http2.DataFrame:
417			t.handleData(frame)
418		case *http2.RSTStreamFrame:
419			t.handleRSTStream(frame)
420		case *http2.SettingsFrame:
421			t.handleSettings(frame)
422		case *http2.PingFrame:
423			t.handlePing(frame)
424		case *http2.WindowUpdateFrame:
425			t.handleWindowUpdate(frame)
426		case *http2.GoAwayFrame:
427			// TODO: Handle GoAway from the client appropriately.
428		default:
429			errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
430		}
431	}
432}
433
434func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
435	t.mu.Lock()
436	defer t.mu.Unlock()
437	if t.activeStreams == nil {
438		// The transport is closing.
439		return nil, false
440	}
441	s, ok := t.activeStreams[f.Header().StreamID]
442	if !ok {
443		// The stream is already done.
444		return nil, false
445	}
446	return s, true
447}
448
449// adjustWindow sends out extra window update over the initial window size
450// of stream if the application is requesting data larger in size than
451// the window.
452func (t *http2Server) adjustWindow(s *Stream, n uint32) {
453	s.mu.Lock()
454	defer s.mu.Unlock()
455	if s.state == streamDone {
456		return
457	}
458	if w := s.fc.maybeAdjust(n); w > 0 {
459		if cw := t.fc.resetPendingUpdate(); cw > 0 {
460			t.controlBuf.put(&windowUpdate{0, cw, false})
461		}
462		t.controlBuf.put(&windowUpdate{s.id, w, true})
463	}
464}
465
466// updateWindow adjusts the inbound quota for the stream and the transport.
467// Window updates will deliver to the controller for sending when
468// the cumulative quota exceeds the corresponding threshold.
469func (t *http2Server) updateWindow(s *Stream, n uint32) {
470	s.mu.Lock()
471	defer s.mu.Unlock()
472	if s.state == streamDone {
473		return
474	}
475	if w := s.fc.onRead(n); w > 0 {
476		if cw := t.fc.resetPendingUpdate(); cw > 0 {
477			t.controlBuf.put(&windowUpdate{0, cw, false})
478		}
479		t.controlBuf.put(&windowUpdate{s.id, w, true})
480	}
481}
482
483// updateFlowControl updates the incoming flow control windows
484// for the transport and the stream based on the current bdp
485// estimation.
486func (t *http2Server) updateFlowControl(n uint32) {
487	t.mu.Lock()
488	for _, s := range t.activeStreams {
489		s.fc.newLimit(n)
490	}
491	t.initialWindowSize = int32(n)
492	t.mu.Unlock()
493	t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
494	t.controlBuf.put(&settings{
495		ack: false,
496		ss: []http2.Setting{
497			{
498				ID:  http2.SettingInitialWindowSize,
499				Val: uint32(n),
500			},
501		},
502	})
503
504}
505
506func (t *http2Server) handleData(f *http2.DataFrame) {
507	size := f.Header().Length
508	var sendBDPPing bool
509	if t.bdpEst != nil {
510		sendBDPPing = t.bdpEst.add(uint32(size))
511	}
512	// Decouple connection's flow control from application's read.
513	// An update on connection's flow control should not depend on
514	// whether user application has read the data or not. Such a
515	// restriction is already imposed on the stream's flow control,
516	// and therefore the sender will be blocked anyways.
517	// Decoupling the connection flow control will prevent other
518	// active(fast) streams from starving in presence of slow or
519	// inactive streams.
520	//
521	// Furthermore, if a bdpPing is being sent out we can piggyback
522	// connection's window update for the bytes we just received.
523	if sendBDPPing {
524		t.controlBuf.put(&windowUpdate{0, uint32(size), false})
525		t.controlBuf.put(bdpPing)
526	} else {
527		if err := t.fc.onData(uint32(size)); err != nil {
528			errorf("transport: http2Server %v", err)
529			t.Close()
530			return
531		}
532		if w := t.fc.onRead(uint32(size)); w > 0 {
533			t.controlBuf.put(&windowUpdate{0, w, true})
534		}
535	}
536	// Select the right stream to dispatch.
537	s, ok := t.getStream(f)
538	if !ok {
539		return
540	}
541	if size > 0 {
542		s.mu.Lock()
543		if s.state == streamDone {
544			s.mu.Unlock()
545			return
546		}
547		if err := s.fc.onData(uint32(size)); err != nil {
548			s.mu.Unlock()
549			t.closeStream(s)
550			t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
551			return
552		}
553		if f.Header().Flags.Has(http2.FlagDataPadded) {
554			if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
555				t.controlBuf.put(&windowUpdate{s.id, w, true})
556			}
557		}
558		s.mu.Unlock()
559		// TODO(bradfitz, zhaoq): A copy is required here because there is no
560		// guarantee f.Data() is consumed before the arrival of next frame.
561		// Can this copy be eliminated?
562		if len(f.Data()) > 0 {
563			data := make([]byte, len(f.Data()))
564			copy(data, f.Data())
565			s.write(recvMsg{data: data})
566		}
567	}
568	if f.Header().Flags.Has(http2.FlagDataEndStream) {
569		// Received the end of stream from the client.
570		s.mu.Lock()
571		if s.state != streamDone {
572			s.state = streamReadDone
573		}
574		s.mu.Unlock()
575		s.write(recvMsg{err: io.EOF})
576	}
577}
578
579func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
580	s, ok := t.getStream(f)
581	if !ok {
582		return
583	}
584	t.closeStream(s)
585}
586
587func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
588	if f.IsAck() {
589		return
590	}
591	var ss []http2.Setting
592	f.ForeachSetting(func(s http2.Setting) error {
593		ss = append(ss, s)
594		return nil
595	})
596	// The settings will be applied once the ack is sent.
597	t.controlBuf.put(&settings{ack: true, ss: ss})
598}
599
600const (
601	maxPingStrikes     = 2
602	defaultPingTimeout = 2 * time.Hour
603)
604
605func (t *http2Server) handlePing(f *http2.PingFrame) {
606	if f.IsAck() {
607		if f.Data == goAwayPing.data && t.drainChan != nil {
608			close(t.drainChan)
609			return
610		}
611		// Maybe it's a BDP ping.
612		if t.bdpEst != nil {
613			t.bdpEst.calculate(f.Data)
614		}
615		return
616	}
617	pingAck := &ping{ack: true}
618	copy(pingAck.data[:], f.Data[:])
619	t.controlBuf.put(pingAck)
620
621	now := time.Now()
622	defer func() {
623		t.lastPingAt = now
624	}()
625	// A reset ping strikes means that we don't need to check for policy
626	// violation for this ping and the pingStrikes counter should be set
627	// to 0.
628	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
629		t.pingStrikes = 0
630		return
631	}
632	t.mu.Lock()
633	ns := len(t.activeStreams)
634	t.mu.Unlock()
635	if ns < 1 && !t.kep.PermitWithoutStream {
636		// Keepalive shouldn't be active thus, this new ping should
637		// have come after atleast defaultPingTimeout.
638		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
639			t.pingStrikes++
640		}
641	} else {
642		// Check if keepalive policy is respected.
643		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
644			t.pingStrikes++
645		}
646	}
647
648	if t.pingStrikes > maxPingStrikes {
649		// Send goaway and close the connection.
650		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
651	}
652}
653
654func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
655	id := f.Header().StreamID
656	incr := f.Increment
657	if id == 0 {
658		t.sendQuotaPool.add(int(incr))
659		return
660	}
661	if s, ok := t.getStream(f); ok {
662		s.sendQuotaPool.add(int(incr))
663	}
664}
665
666func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error {
667	first := true
668	endHeaders := false
669	var err error
670	defer func() {
671		if err == nil {
672			// Reset ping strikes when seding headers since that might cause the
673			// peer to send ping.
674			atomic.StoreUint32(&t.resetPingStrikes, 1)
675		}
676	}()
677	// Sends the headers in a single batch.
678	for !endHeaders {
679		size := t.hBuf.Len()
680		if size > http2MaxFrameLen {
681			size = http2MaxFrameLen
682		} else {
683			endHeaders = true
684		}
685		if first {
686			p := http2.HeadersFrameParam{
687				StreamID:      s.id,
688				BlockFragment: b.Next(size),
689				EndStream:     endStream,
690				EndHeaders:    endHeaders,
691			}
692			err = t.framer.writeHeaders(endHeaders, p)
693			first = false
694		} else {
695			err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size))
696		}
697		if err != nil {
698			t.Close()
699			return connectionErrorf(true, err, "transport: %v", err)
700		}
701	}
702	return nil
703}
704
705// WriteHeader sends the header metedata md back to the client.
706func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
707	s.mu.Lock()
708	if s.headerOk || s.state == streamDone {
709		s.mu.Unlock()
710		return ErrIllegalHeaderWrite
711	}
712	s.headerOk = true
713	if md.Len() > 0 {
714		if s.header.Len() > 0 {
715			s.header = metadata.Join(s.header, md)
716		} else {
717			s.header = md
718		}
719	}
720	md = s.header
721	s.mu.Unlock()
722	if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
723		return err
724	}
725	t.hBuf.Reset()
726	t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
727	t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
728	if s.sendCompress != "" {
729		t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
730	}
731	for k, vv := range md {
732		if isReservedHeader(k) {
733			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
734			continue
735		}
736		for _, v := range vv {
737			t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
738		}
739	}
740	bufLen := t.hBuf.Len()
741	if err := t.writeHeaders(s, t.hBuf, false); err != nil {
742		return err
743	}
744	if t.stats != nil {
745		outHeader := &stats.OutHeader{
746			WireLength: bufLen,
747		}
748		t.stats.HandleRPC(s.Context(), outHeader)
749	}
750	t.writableChan <- 0
751	return nil
752}
753
754// WriteStatus sends stream status to the client and terminates the stream.
755// There is no further I/O operations being able to perform on this stream.
756// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
757// OK is adopted.
758func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
759	var headersSent, hasHeader bool
760	s.mu.Lock()
761	if s.state == streamDone {
762		s.mu.Unlock()
763		return nil
764	}
765	if s.headerOk {
766		headersSent = true
767	}
768	if s.header.Len() > 0 {
769		hasHeader = true
770	}
771	s.mu.Unlock()
772
773	if !headersSent && hasHeader {
774		t.WriteHeader(s, nil)
775		headersSent = true
776	}
777
778	// Always write a status regardless of context cancellation unless the stream
779	// is terminated (e.g. by a RST_STREAM, GOAWAY, or transport error).  The
780	// server's application code is already done so it is fine to ignore s.ctx.
781	select {
782	case <-t.shutdownChan:
783		return ErrConnClosing
784	case <-t.writableChan:
785	}
786	t.hBuf.Reset()
787	if !headersSent {
788		t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
789		t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
790	}
791	t.hEnc.WriteField(
792		hpack.HeaderField{
793			Name:  "grpc-status",
794			Value: strconv.Itoa(int(st.Code())),
795		})
796	t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
797
798	if p := st.Proto(); p != nil && len(p.Details) > 0 {
799		stBytes, err := proto.Marshal(p)
800		if err != nil {
801			// TODO: return error instead, when callers are able to handle it.
802			panic(err)
803		}
804
805		t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
806	}
807
808	// Attach the trailer metadata.
809	for k, vv := range s.trailer {
810		// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
811		if isReservedHeader(k) {
812			continue
813		}
814		for _, v := range vv {
815			t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
816		}
817	}
818	bufLen := t.hBuf.Len()
819	if err := t.writeHeaders(s, t.hBuf, true); err != nil {
820		t.Close()
821		return err
822	}
823	if t.stats != nil {
824		outTrailer := &stats.OutTrailer{
825			WireLength: bufLen,
826		}
827		t.stats.HandleRPC(s.Context(), outTrailer)
828	}
829	t.closeStream(s)
830	t.writableChan <- 0
831	return nil
832}
833
834// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
835// is returns if it fails (e.g., framing error, transport error).
836func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) (err error) {
837	// TODO(zhaoq): Support multi-writers for a single stream.
838	secondStart := http2MaxFrameLen - len(hdr)%http2MaxFrameLen
839	if len(data) < secondStart {
840		secondStart = len(data)
841	}
842	hdr = append(hdr, data[:secondStart]...)
843	data = data[secondStart:]
844	isLastSlice := (len(data) == 0)
845	var writeHeaderFrame bool
846	s.mu.Lock()
847	if s.state == streamDone {
848		s.mu.Unlock()
849		return streamErrorf(codes.Unknown, "the stream has been done")
850	}
851	if !s.headerOk {
852		writeHeaderFrame = true
853	}
854	s.mu.Unlock()
855	if writeHeaderFrame {
856		t.WriteHeader(s, nil)
857	}
858	r := bytes.NewBuffer(hdr)
859	var (
860		p   []byte
861		oqv uint32
862	)
863	for {
864		if r.Len() == 0 && p == nil {
865			return nil
866		}
867		oqv = atomic.LoadUint32(&t.outQuotaVersion)
868		size := http2MaxFrameLen
869		// Wait until the stream has some quota to send the data.
870		sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
871		if err != nil {
872			return err
873		}
874		// Wait until the transport has some quota to send the data.
875		tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
876		if err != nil {
877			return err
878		}
879		if sq < size {
880			size = sq
881		}
882		if tq < size {
883			size = tq
884		}
885		if p == nil {
886			p = r.Next(size)
887		}
888		ps := len(p)
889		if ps < sq {
890			// Overbooked stream quota. Return it back.
891			s.sendQuotaPool.add(sq - ps)
892		}
893		if ps < tq {
894			// Overbooked transport quota. Return it back.
895			t.sendQuotaPool.add(tq - ps)
896		}
897		t.framer.adjustNumWriters(1)
898		// Got some quota. Try to acquire writing privilege on the
899		// transport.
900		if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
901			if _, ok := err.(StreamError); ok {
902				// Return the connection quota back.
903				t.sendQuotaPool.add(ps)
904			}
905			if t.framer.adjustNumWriters(-1) == 0 {
906				// This writer is the last one in this batch and has the
907				// responsibility to flush the buffered frames. It queues
908				// a flush request to controlBuf instead of flushing directly
909				// in order to avoid the race with other writing or flushing.
910				t.controlBuf.put(&flushIO{})
911			}
912			return err
913		}
914		select {
915		case <-s.ctx.Done():
916			t.sendQuotaPool.add(ps)
917			if t.framer.adjustNumWriters(-1) == 0 {
918				t.controlBuf.put(&flushIO{})
919			}
920			t.writableChan <- 0
921			return ContextErr(s.ctx.Err())
922		default:
923		}
924		if oqv != atomic.LoadUint32(&t.outQuotaVersion) {
925			// InitialWindowSize settings frame must have been received after we
926			// acquired send quota but before we got the writable channel.
927			// We must forsake this write.
928			t.sendQuotaPool.add(ps)
929			s.sendQuotaPool.add(ps)
930			if t.framer.adjustNumWriters(-1) == 0 {
931				t.controlBuf.put(&flushIO{})
932			}
933			t.writableChan <- 0
934			continue
935		}
936		var forceFlush bool
937		if r.Len() == 0 {
938			if isLastSlice {
939				if t.framer.adjustNumWriters(0) == 1 && !opts.Last {
940					forceFlush = true
941				}
942			} else {
943				r = bytes.NewBuffer(data)
944				isLastSlice = true
945			}
946		}
947		// Reset ping strikes when sending data since this might cause
948		// the peer to send ping.
949		atomic.StoreUint32(&t.resetPingStrikes, 1)
950		if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
951			t.Close()
952			return connectionErrorf(true, err, "transport: %v", err)
953		}
954		p = nil
955		if t.framer.adjustNumWriters(-1) == 0 {
956			t.framer.flushWrite()
957		}
958		t.writableChan <- 0
959	}
960
961}
962
963func (t *http2Server) applySettings(ss []http2.Setting) {
964	for _, s := range ss {
965		if s.ID == http2.SettingInitialWindowSize {
966			t.mu.Lock()
967			defer t.mu.Unlock()
968			for _, stream := range t.activeStreams {
969				stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota))
970			}
971			t.streamSendQuota = s.Val
972			atomic.AddUint32(&t.outQuotaVersion, 1)
973		}
974
975	}
976}
977
978// keepalive running in a separate goroutine does the following:
979// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
980// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
981// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
982// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
983// after an additional duration of keepalive.Timeout.
984func (t *http2Server) keepalive() {
985	p := &ping{}
986	var pingSent bool
987	maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
988	maxAge := time.NewTimer(t.kp.MaxConnectionAge)
989	keepalive := time.NewTimer(t.kp.Time)
990	// NOTE: All exit paths of this function should reset their
991	// respecitve timers. A failure to do so will cause the
992	// following clean-up to deadlock and eventually leak.
993	defer func() {
994		if !maxIdle.Stop() {
995			<-maxIdle.C
996		}
997		if !maxAge.Stop() {
998			<-maxAge.C
999		}
1000		if !keepalive.Stop() {
1001			<-keepalive.C
1002		}
1003	}()
1004	for {
1005		select {
1006		case <-maxIdle.C:
1007			t.mu.Lock()
1008			idle := t.idle
1009			if idle.IsZero() { // The connection is non-idle.
1010				t.mu.Unlock()
1011				maxIdle.Reset(t.kp.MaxConnectionIdle)
1012				continue
1013			}
1014			val := t.kp.MaxConnectionIdle - time.Since(idle)
1015			t.mu.Unlock()
1016			if val <= 0 {
1017				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
1018				// Gracefully close the connection.
1019				t.drain(http2.ErrCodeNo, []byte{})
1020				// Reseting the timer so that the clean-up doesn't deadlock.
1021				maxIdle.Reset(infinity)
1022				return
1023			}
1024			maxIdle.Reset(val)
1025		case <-maxAge.C:
1026			t.drain(http2.ErrCodeNo, []byte{})
1027			maxAge.Reset(t.kp.MaxConnectionAgeGrace)
1028			select {
1029			case <-maxAge.C:
1030				// Close the connection after grace period.
1031				t.Close()
1032				// Reseting the timer so that the clean-up doesn't deadlock.
1033				maxAge.Reset(infinity)
1034			case <-t.shutdownChan:
1035			}
1036			return
1037		case <-keepalive.C:
1038			if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1039				pingSent = false
1040				keepalive.Reset(t.kp.Time)
1041				continue
1042			}
1043			if pingSent {
1044				t.Close()
1045				// Reseting the timer so that the clean-up doesn't deadlock.
1046				keepalive.Reset(infinity)
1047				return
1048			}
1049			pingSent = true
1050			t.controlBuf.put(p)
1051			keepalive.Reset(t.kp.Timeout)
1052		case <-t.shutdownChan:
1053			return
1054		}
1055	}
1056}
1057
1058var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1059
1060// controller running in a separate goroutine takes charge of sending control
1061// frames (e.g., window update, reset stream, setting, etc.) to the server.
1062func (t *http2Server) controller() {
1063	for {
1064		select {
1065		case i := <-t.controlBuf.get():
1066			t.controlBuf.load()
1067			select {
1068			case <-t.writableChan:
1069				switch i := i.(type) {
1070				case *windowUpdate:
1071					t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
1072				case *settings:
1073					if i.ack {
1074						t.framer.writeSettingsAck(true)
1075						t.applySettings(i.ss)
1076					} else {
1077						t.framer.writeSettings(true, i.ss...)
1078					}
1079				case *resetStream:
1080					t.framer.writeRSTStream(true, i.streamID, i.code)
1081				case *goAway:
1082					t.mu.Lock()
1083					if t.state == closing {
1084						t.mu.Unlock()
1085						// The transport is closing.
1086						return
1087					}
1088					sid := t.maxStreamID
1089					if !i.headsUp {
1090						// Stop accepting more streams now.
1091						t.state = draining
1092						activeStreams := len(t.activeStreams)
1093						t.mu.Unlock()
1094						t.framer.writeGoAway(true, sid, i.code, i.debugData)
1095						if i.closeConn || activeStreams == 0 {
1096							// Abruptly close the connection following the GoAway.
1097							t.Close()
1098						}
1099						t.writableChan <- 0
1100						continue
1101					}
1102					t.mu.Unlock()
1103					// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1104					// Follow that with a ping and wait for the ack to come back or a timer
1105					// to expire. During this time accept new streams since they might have
1106					// originated before the GoAway reaches the client.
1107					// After getting the ack or timer expiration send out another GoAway this
1108					// time with an ID of the max stream server intends to process.
1109					t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{})
1110					t.framer.writePing(true, false, goAwayPing.data)
1111					go func() {
1112						timer := time.NewTimer(time.Minute)
1113						defer timer.Stop()
1114						select {
1115						case <-t.drainChan:
1116						case <-timer.C:
1117						case <-t.shutdownChan:
1118							return
1119						}
1120						t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
1121					}()
1122				case *flushIO:
1123					t.framer.flushWrite()
1124				case *ping:
1125					if !i.ack {
1126						t.bdpEst.timesnap(i.data)
1127					}
1128					t.framer.writePing(true, i.ack, i.data)
1129				default:
1130					errorf("transport: http2Server.controller got unexpected item type %v\n", i)
1131				}
1132				t.writableChan <- 0
1133				continue
1134			case <-t.shutdownChan:
1135				return
1136			}
1137		case <-t.shutdownChan:
1138			return
1139		}
1140	}
1141}
1142
1143// Close starts shutting down the http2Server transport.
1144// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1145// could cause some resource issue. Revisit this later.
1146func (t *http2Server) Close() (err error) {
1147	t.mu.Lock()
1148	if t.state == closing {
1149		t.mu.Unlock()
1150		return errors.New("transport: Close() was already called")
1151	}
1152	t.state = closing
1153	streams := t.activeStreams
1154	t.activeStreams = nil
1155	t.mu.Unlock()
1156	close(t.shutdownChan)
1157	err = t.conn.Close()
1158	// Cancel all active streams.
1159	for _, s := range streams {
1160		s.cancel()
1161	}
1162	if t.stats != nil {
1163		connEnd := &stats.ConnEnd{}
1164		t.stats.HandleConn(t.ctx, connEnd)
1165	}
1166	return
1167}
1168
1169// closeStream clears the footprint of a stream when the stream is not needed
1170// any more.
1171func (t *http2Server) closeStream(s *Stream) {
1172	t.mu.Lock()
1173	delete(t.activeStreams, s.id)
1174	if len(t.activeStreams) == 0 {
1175		t.idle = time.Now()
1176	}
1177	if t.state == draining && len(t.activeStreams) == 0 {
1178		defer t.Close()
1179	}
1180	t.mu.Unlock()
1181	// In case stream sending and receiving are invoked in separate
1182	// goroutines (e.g., bi-directional streaming), cancel needs to be
1183	// called to interrupt the potential blocking on other goroutines.
1184	s.cancel()
1185	s.mu.Lock()
1186	if s.state == streamDone {
1187		s.mu.Unlock()
1188		return
1189	}
1190	s.state = streamDone
1191	s.mu.Unlock()
1192}
1193
1194func (t *http2Server) RemoteAddr() net.Addr {
1195	return t.remoteAddr
1196}
1197
1198func (t *http2Server) Drain() {
1199	t.drain(http2.ErrCodeNo, []byte{})
1200}
1201
1202func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1203	t.mu.Lock()
1204	defer t.mu.Unlock()
1205	if t.drainChan != nil {
1206		return
1207	}
1208	t.drainChan = make(chan struct{})
1209	t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1210}
1211
1212var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
1213
1214func getJitter(v time.Duration) time.Duration {
1215	if v == infinity {
1216		return 0
1217	}
1218	// Generate a jitter between +/- 10% of the value.
1219	r := int64(v / 10)
1220	j := rgen.Int63n(2*r) - r
1221	return time.Duration(j)
1222}
1223