1/*
2 *
3 * Copyright 2014, Google Inc.
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are
8 * met:
9 *
10 *     * Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *     * Redistributions in binary form must reproduce the above
13 * copyright notice, this list of conditions and the following disclaimer
14 * in the documentation and/or other materials provided with the
15 * distribution.
16 *     * Neither the name of Google Inc. nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 *
32 */
33
34package transport
35
36import (
37	"bytes"
38	"errors"
39	"io"
40	"math"
41	"net"
42	"strconv"
43	"sync"
44
45	"golang.org/x/net/context"
46	"golang.org/x/net/http2"
47	"golang.org/x/net/http2/hpack"
48	"google.golang.org/grpc/codes"
49	"google.golang.org/grpc/credentials"
50	"google.golang.org/grpc/grpclog"
51	"google.golang.org/grpc/metadata"
52	"google.golang.org/grpc/peer"
53)
54
55// ErrIllegalHeaderWrite indicates that setting header is illegal because of
56// the stream's state.
57var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
58
59// http2Server implements the ServerTransport interface with HTTP2.
60type http2Server struct {
61	conn        net.Conn
62	maxStreamID uint32               // max stream ID ever seen
63	authInfo    credentials.AuthInfo // auth info about the connection
64	// writableChan synchronizes write access to the transport.
65	// A writer acquires the write lock by receiving a value on writableChan
66	// and releases it by sending on writableChan.
67	writableChan chan int
68	// shutdownChan is closed when Close is called.
69	// Blocking operations should select on shutdownChan to avoid
70	// blocking forever after Close.
71	shutdownChan chan struct{}
72	framer       *framer
73	hBuf         *bytes.Buffer  // the buffer for HPACK encoding
74	hEnc         *hpack.Encoder // HPACK encoder
75
76	// The max number of concurrent streams.
77	maxStreams uint32
78	// controlBuf delivers all the control related tasks (e.g., window
79	// updates, reset streams, and various settings) to the controller.
80	controlBuf *recvBuffer
81	fc         *inFlow
82	// sendQuotaPool provides flow control to outbound message.
83	sendQuotaPool *quotaPool
84
85	mu            sync.Mutex // guard the following
86	state         transportState
87	activeStreams map[uint32]*Stream
88	// the per-stream outbound flow control window size set by the peer.
89	streamSendQuota uint32
90}
91
92// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
93// returned if something goes wrong.
94func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
95	framer := newFramer(conn)
96	// Send initial settings as connection preface to client.
97	var settings []http2.Setting
98	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
99	// permitted in the HTTP2 spec.
100	if maxStreams == 0 {
101		maxStreams = math.MaxUint32
102	} else {
103		settings = append(settings, http2.Setting{http2.SettingMaxConcurrentStreams, maxStreams})
104	}
105	if initialWindowSize != defaultWindowSize {
106		settings = append(settings, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)})
107	}
108	if err := framer.writeSettings(true, settings...); err != nil {
109		return nil, ConnectionErrorf("transport: %v", err)
110	}
111	// Adjust the connection flow control window if needed.
112	if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
113		if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
114			return nil, ConnectionErrorf("transport: %v", err)
115		}
116	}
117	var buf bytes.Buffer
118	t := &http2Server{
119		conn:            conn,
120		authInfo:        authInfo,
121		framer:          framer,
122		hBuf:            &buf,
123		hEnc:            hpack.NewEncoder(&buf),
124		maxStreams:      maxStreams,
125		controlBuf:      newRecvBuffer(),
126		fc:              &inFlow{limit: initialConnWindowSize},
127		sendQuotaPool:   newQuotaPool(defaultWindowSize),
128		state:           reachable,
129		writableChan:    make(chan int, 1),
130		shutdownChan:    make(chan struct{}),
131		activeStreams:   make(map[uint32]*Stream),
132		streamSendQuota: defaultWindowSize,
133	}
134	go t.controller()
135	t.writableChan <- 0
136	return t, nil
137}
138
139// operateHeader takes action on the decoded headers.
140func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) {
141	buf := newRecvBuffer()
142	fc := &inFlow{
143		limit: initialWindowSize,
144		conn:  t.fc,
145	}
146	s := &Stream{
147		id:  frame.Header().StreamID,
148		st:  t,
149		buf: buf,
150		fc:  fc,
151	}
152
153	var state decodeState
154	for _, hf := range frame.Fields {
155		state.processHeaderField(hf)
156	}
157	if err := state.err; err != nil {
158		if se, ok := err.(StreamError); ok {
159			t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
160		}
161		return
162	}
163
164	if frame.StreamEnded() {
165		// s is just created by the caller. No lock needed.
166		s.state = streamReadDone
167	}
168	s.recvCompress = state.encoding
169	if state.timeoutSet {
170		s.ctx, s.cancel = context.WithTimeout(context.TODO(), state.timeout)
171	} else {
172		s.ctx, s.cancel = context.WithCancel(context.TODO())
173	}
174	pr := &peer.Peer{
175		Addr: t.conn.RemoteAddr(),
176	}
177	// Attach Auth info if there is any.
178	if t.authInfo != nil {
179		pr.AuthInfo = t.authInfo
180	}
181	s.ctx = peer.NewContext(s.ctx, pr)
182	// Cache the current stream to the context so that the server application
183	// can find out. Required when the server wants to send some metadata
184	// back to the client (unary call only).
185	s.ctx = newContextWithStream(s.ctx, s)
186	// Attach the received metadata to the context.
187	if len(state.mdata) > 0 {
188		s.ctx = metadata.NewContext(s.ctx, state.mdata)
189	}
190
191	s.dec = &recvBufferReader{
192		ctx:  s.ctx,
193		recv: s.buf,
194	}
195	s.recvCompress = state.encoding
196	s.method = state.method
197	t.mu.Lock()
198	if t.state != reachable {
199		t.mu.Unlock()
200		return
201	}
202	if uint32(len(t.activeStreams)) >= t.maxStreams {
203		t.mu.Unlock()
204		t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
205		return
206	}
207	s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
208	t.activeStreams[s.id] = s
209	t.mu.Unlock()
210	s.windowHandler = func(n int) {
211		t.updateWindow(s, uint32(n))
212	}
213	handle(s)
214}
215
216// HandleStreams receives incoming streams using the given handler. This is
217// typically run in a separate goroutine.
218func (t *http2Server) HandleStreams(handle func(*Stream)) {
219	// Check the validity of client preface.
220	preface := make([]byte, len(clientPreface))
221	if _, err := io.ReadFull(t.conn, preface); err != nil {
222		grpclog.Printf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
223		t.Close()
224		return
225	}
226	if !bytes.Equal(preface, clientPreface) {
227		grpclog.Printf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
228		t.Close()
229		return
230	}
231
232	frame, err := t.framer.readFrame()
233	if err != nil {
234		grpclog.Printf("transport: http2Server.HandleStreams failed to read frame: %v", err)
235		t.Close()
236		return
237	}
238	sf, ok := frame.(*http2.SettingsFrame)
239	if !ok {
240		grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
241		t.Close()
242		return
243	}
244	t.handleSettings(sf)
245
246	for {
247		frame, err := t.framer.readFrame()
248		if err != nil {
249			t.Close()
250			return
251		}
252		switch frame := frame.(type) {
253		case *http2.MetaHeadersFrame:
254			id := frame.Header().StreamID
255			if id%2 != 1 || id <= t.maxStreamID {
256				// illegal gRPC stream id.
257				grpclog.Println("transport: http2Server.HandleStreams received an illegal stream id: ", id)
258				t.Close()
259				break
260			}
261			t.maxStreamID = id
262			t.operateHeaders(frame, handle)
263		case *http2.DataFrame:
264			t.handleData(frame)
265		case *http2.RSTStreamFrame:
266			t.handleRSTStream(frame)
267		case *http2.SettingsFrame:
268			t.handleSettings(frame)
269		case *http2.PingFrame:
270			t.handlePing(frame)
271		case *http2.WindowUpdateFrame:
272			t.handleWindowUpdate(frame)
273		case *http2.GoAwayFrame:
274			break
275		default:
276			grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
277		}
278	}
279}
280
281func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
282	t.mu.Lock()
283	defer t.mu.Unlock()
284	if t.activeStreams == nil {
285		// The transport is closing.
286		return nil, false
287	}
288	s, ok := t.activeStreams[f.Header().StreamID]
289	if !ok {
290		// The stream is already done.
291		return nil, false
292	}
293	return s, true
294}
295
296// updateWindow adjusts the inbound quota for the stream and the transport.
297// Window updates will deliver to the controller for sending when
298// the cumulative quota exceeds the corresponding threshold.
299func (t *http2Server) updateWindow(s *Stream, n uint32) {
300	swu, cwu := s.fc.onRead(n)
301	if swu > 0 {
302		t.controlBuf.put(&windowUpdate{s.id, swu})
303	}
304	if cwu > 0 {
305		t.controlBuf.put(&windowUpdate{0, cwu})
306	}
307}
308
309func (t *http2Server) handleData(f *http2.DataFrame) {
310	// Select the right stream to dispatch.
311	s, ok := t.getStream(f)
312	if !ok {
313		return
314	}
315	size := len(f.Data())
316	if size > 0 {
317		if err := s.fc.onData(uint32(size)); err != nil {
318			if _, ok := err.(ConnectionError); ok {
319				grpclog.Printf("transport: http2Server %v", err)
320				t.Close()
321				return
322			}
323			t.closeStream(s)
324			t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
325			return
326		}
327		// TODO(bradfitz, zhaoq): A copy is required here because there is no
328		// guarantee f.Data() is consumed before the arrival of next frame.
329		// Can this copy be eliminated?
330		data := make([]byte, size)
331		copy(data, f.Data())
332		s.write(recvMsg{data: data})
333	}
334	if f.Header().Flags.Has(http2.FlagDataEndStream) {
335		// Received the end of stream from the client.
336		s.mu.Lock()
337		if s.state != streamDone {
338			if s.state == streamWriteDone {
339				s.state = streamDone
340			} else {
341				s.state = streamReadDone
342			}
343		}
344		s.mu.Unlock()
345		s.write(recvMsg{err: io.EOF})
346	}
347}
348
349func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
350	s, ok := t.getStream(f)
351	if !ok {
352		return
353	}
354	t.closeStream(s)
355}
356
357func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
358	if f.IsAck() {
359		return
360	}
361	var ss []http2.Setting
362	f.ForeachSetting(func(s http2.Setting) error {
363		ss = append(ss, s)
364		return nil
365	})
366	// The settings will be applied once the ack is sent.
367	t.controlBuf.put(&settings{ack: true, ss: ss})
368}
369
370func (t *http2Server) handlePing(f *http2.PingFrame) {
371	pingAck := &ping{ack: true}
372	copy(pingAck.data[:], f.Data[:])
373	t.controlBuf.put(pingAck)
374}
375
376func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
377	id := f.Header().StreamID
378	incr := f.Increment
379	if id == 0 {
380		t.sendQuotaPool.add(int(incr))
381		return
382	}
383	if s, ok := t.getStream(f); ok {
384		s.sendQuotaPool.add(int(incr))
385	}
386}
387
388func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error {
389	first := true
390	endHeaders := false
391	var err error
392	// Sends the headers in a single batch.
393	for !endHeaders {
394		size := t.hBuf.Len()
395		if size > http2MaxFrameLen {
396			size = http2MaxFrameLen
397		} else {
398			endHeaders = true
399		}
400		if first {
401			p := http2.HeadersFrameParam{
402				StreamID:      s.id,
403				BlockFragment: b.Next(size),
404				EndStream:     endStream,
405				EndHeaders:    endHeaders,
406			}
407			err = t.framer.writeHeaders(endHeaders, p)
408			first = false
409		} else {
410			err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size))
411		}
412		if err != nil {
413			t.Close()
414			return ConnectionErrorf("transport: %v", err)
415		}
416	}
417	return nil
418}
419
420// WriteHeader sends the header metedata md back to the client.
421func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
422	s.mu.Lock()
423	if s.headerOk || s.state == streamDone {
424		s.mu.Unlock()
425		return ErrIllegalHeaderWrite
426	}
427	s.headerOk = true
428	s.mu.Unlock()
429	if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
430		return err
431	}
432	t.hBuf.Reset()
433	t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
434	t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
435	if s.sendCompress != "" {
436		t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
437	}
438	for k, v := range md {
439		for _, entry := range v {
440			t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
441		}
442	}
443	if err := t.writeHeaders(s, t.hBuf, false); err != nil {
444		return err
445	}
446	t.writableChan <- 0
447	return nil
448}
449
450// WriteStatus sends stream status to the client and terminates the stream.
451// There is no further I/O operations being able to perform on this stream.
452// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
453// OK is adopted.
454func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc string) error {
455	var headersSent bool
456	s.mu.Lock()
457	if s.state == streamDone {
458		s.mu.Unlock()
459		return nil
460	}
461	if s.headerOk {
462		headersSent = true
463	}
464	s.mu.Unlock()
465	if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
466		return err
467	}
468	t.hBuf.Reset()
469	if !headersSent {
470		t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
471		t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
472	}
473	t.hEnc.WriteField(
474		hpack.HeaderField{
475			Name:  "grpc-status",
476			Value: strconv.Itoa(int(statusCode)),
477		})
478	t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: statusDesc})
479	// Attach the trailer metadata.
480	for k, v := range s.trailer {
481		for _, entry := range v {
482			t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: entry})
483		}
484	}
485	if err := t.writeHeaders(s, t.hBuf, true); err != nil {
486		t.Close()
487		return err
488	}
489	t.closeStream(s)
490	t.writableChan <- 0
491	return nil
492}
493
494// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
495// is returns if it fails (e.g., framing error, transport error).
496func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
497	// TODO(zhaoq): Support multi-writers for a single stream.
498	var writeHeaderFrame bool
499	s.mu.Lock()
500	if !s.headerOk {
501		writeHeaderFrame = true
502		s.headerOk = true
503	}
504	s.mu.Unlock()
505	if writeHeaderFrame {
506		if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
507			return err
508		}
509		t.hBuf.Reset()
510		t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
511		t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
512		if s.sendCompress != "" {
513			t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
514		}
515		p := http2.HeadersFrameParam{
516			StreamID:      s.id,
517			BlockFragment: t.hBuf.Bytes(),
518			EndHeaders:    true,
519		}
520		if err := t.framer.writeHeaders(false, p); err != nil {
521			t.Close()
522			return ConnectionErrorf("transport: %v", err)
523		}
524		t.writableChan <- 0
525	}
526	r := bytes.NewBuffer(data)
527	for {
528		if r.Len() == 0 {
529			return nil
530		}
531		size := http2MaxFrameLen
532		s.sendQuotaPool.add(0)
533		// Wait until the stream has some quota to send the data.
534		sq, err := wait(s.ctx, t.shutdownChan, s.sendQuotaPool.acquire())
535		if err != nil {
536			return err
537		}
538		t.sendQuotaPool.add(0)
539		// Wait until the transport has some quota to send the data.
540		tq, err := wait(s.ctx, t.shutdownChan, t.sendQuotaPool.acquire())
541		if err != nil {
542			if _, ok := err.(StreamError); ok {
543				t.sendQuotaPool.cancel()
544			}
545			return err
546		}
547		if sq < size {
548			size = sq
549		}
550		if tq < size {
551			size = tq
552		}
553		p := r.Next(size)
554		ps := len(p)
555		if ps < sq {
556			// Overbooked stream quota. Return it back.
557			s.sendQuotaPool.add(sq - ps)
558		}
559		if ps < tq {
560			// Overbooked transport quota. Return it back.
561			t.sendQuotaPool.add(tq - ps)
562		}
563		t.framer.adjustNumWriters(1)
564		// Got some quota. Try to acquire writing privilege on the
565		// transport.
566		if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
567			if t.framer.adjustNumWriters(-1) == 0 {
568				// This writer is the last one in this batch and has the
569				// responsibility to flush the buffered frames. It queues
570				// a flush request to controlBuf instead of flushing directly
571				// in order to avoid the race with other writing or flushing.
572				t.controlBuf.put(&flushIO{})
573			}
574			return err
575		}
576		var forceFlush bool
577		if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last {
578			forceFlush = true
579		}
580		if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
581			t.Close()
582			return ConnectionErrorf("transport: %v", err)
583		}
584		if t.framer.adjustNumWriters(-1) == 0 {
585			t.framer.flushWrite()
586		}
587		t.writableChan <- 0
588	}
589
590}
591
592func (t *http2Server) applySettings(ss []http2.Setting) {
593	for _, s := range ss {
594		if s.ID == http2.SettingInitialWindowSize {
595			t.mu.Lock()
596			defer t.mu.Unlock()
597			for _, stream := range t.activeStreams {
598				stream.sendQuotaPool.reset(int(s.Val - t.streamSendQuota))
599			}
600			t.streamSendQuota = s.Val
601		}
602
603	}
604}
605
606// controller running in a separate goroutine takes charge of sending control
607// frames (e.g., window update, reset stream, setting, etc.) to the server.
608func (t *http2Server) controller() {
609	for {
610		select {
611		case i := <-t.controlBuf.get():
612			t.controlBuf.load()
613			select {
614			case <-t.writableChan:
615				switch i := i.(type) {
616				case *windowUpdate:
617					t.framer.writeWindowUpdate(true, i.streamID, i.increment)
618				case *settings:
619					if i.ack {
620						t.framer.writeSettingsAck(true)
621						t.applySettings(i.ss)
622					} else {
623						t.framer.writeSettings(true, i.ss...)
624					}
625				case *resetStream:
626					t.framer.writeRSTStream(true, i.streamID, i.code)
627				case *flushIO:
628					t.framer.flushWrite()
629				case *ping:
630					t.framer.writePing(true, i.ack, i.data)
631				default:
632					grpclog.Printf("transport: http2Server.controller got unexpected item type %v\n", i)
633				}
634				t.writableChan <- 0
635				continue
636			case <-t.shutdownChan:
637				return
638			}
639		case <-t.shutdownChan:
640			return
641		}
642	}
643}
644
645// Close starts shutting down the http2Server transport.
646// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
647// could cause some resource issue. Revisit this later.
648func (t *http2Server) Close() (err error) {
649	t.mu.Lock()
650	if t.state == closing {
651		t.mu.Unlock()
652		return errors.New("transport: Close() was already called")
653	}
654	t.state = closing
655	streams := t.activeStreams
656	t.activeStreams = nil
657	t.mu.Unlock()
658	close(t.shutdownChan)
659	err = t.conn.Close()
660	// Cancel all active streams.
661	for _, s := range streams {
662		s.cancel()
663	}
664	return
665}
666
667// closeStream clears the footprint of a stream when the stream is not needed
668// any more.
669func (t *http2Server) closeStream(s *Stream) {
670	t.mu.Lock()
671	delete(t.activeStreams, s.id)
672	t.mu.Unlock()
673	if q := s.fc.restoreConn(); q > 0 {
674		t.controlBuf.put(&windowUpdate{0, q})
675	}
676	s.mu.Lock()
677	if s.state == streamDone {
678		s.mu.Unlock()
679		return
680	}
681	s.state = streamDone
682	s.mu.Unlock()
683	// In case stream sending and receiving are invoked in separate
684	// goroutines (e.g., bi-directional streaming), cancel needs to be
685	// called to interrupt the potential blocking on other goroutines.
686	s.cancel()
687}
688
689func (t *http2Server) RemoteAddr() net.Addr {
690	return t.conn.RemoteAddr()
691}
692