1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package transport defines and implements message oriented communication
20// channel to complete various transactions (e.g., an RPC).  It is meant for
21// grpc-internal usage and is not intended to be imported directly by users.
22package transport // externally used as import "google.golang.org/grpc/transport"
23
24import (
25	"errors"
26	"fmt"
27	"io"
28	"net"
29	"sync"
30	"sync/atomic"
31
32	"golang.org/x/net/context"
33	"google.golang.org/grpc/codes"
34	"google.golang.org/grpc/credentials"
35	"google.golang.org/grpc/keepalive"
36	"google.golang.org/grpc/metadata"
37	"google.golang.org/grpc/stats"
38	"google.golang.org/grpc/status"
39	"google.golang.org/grpc/tap"
40)
41
42// recvMsg represents the received msg from the transport. All transport
43// protocol specific info has been removed.
44type recvMsg struct {
45	data []byte
46	// nil: received some data
47	// io.EOF: stream is completed. data is nil.
48	// other non-nil error: transport failure. data is nil.
49	err error
50}
51
52// recvBuffer is an unbounded channel of recvMsg structs.
53// Note recvBuffer differs from controlBuffer only in that recvBuffer
54// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
55// recvBuffer is written to much more often than
56// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
57type recvBuffer struct {
58	c       chan recvMsg
59	mu      sync.Mutex
60	backlog []recvMsg
61	err     error
62}
63
64func newRecvBuffer() *recvBuffer {
65	b := &recvBuffer{
66		c: make(chan recvMsg, 1),
67	}
68	return b
69}
70
71func (b *recvBuffer) put(r recvMsg) {
72	b.mu.Lock()
73	if b.err != nil {
74		b.mu.Unlock()
75		// An error had occurred earlier, don't accept more
76		// data or errors.
77		return
78	}
79	b.err = r.err
80	if len(b.backlog) == 0 {
81		select {
82		case b.c <- r:
83			b.mu.Unlock()
84			return
85		default:
86		}
87	}
88	b.backlog = append(b.backlog, r)
89	b.mu.Unlock()
90}
91
92func (b *recvBuffer) load() {
93	b.mu.Lock()
94	if len(b.backlog) > 0 {
95		select {
96		case b.c <- b.backlog[0]:
97			b.backlog[0] = recvMsg{}
98			b.backlog = b.backlog[1:]
99		default:
100		}
101	}
102	b.mu.Unlock()
103}
104
105// get returns the channel that receives a recvMsg in the buffer.
106//
107// Upon receipt of a recvMsg, the caller should call load to send another
108// recvMsg onto the channel if there is any.
109func (b *recvBuffer) get() <-chan recvMsg {
110	return b.c
111}
112
113//
114// recvBufferReader implements io.Reader interface to read the data from
115// recvBuffer.
116type recvBufferReader struct {
117	ctx     context.Context
118	ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
119	recv    *recvBuffer
120	last    []byte // Stores the remaining data in the previous calls.
121	err     error
122}
123
124// Read reads the next len(p) bytes from last. If last is drained, it tries to
125// read additional data from recv. It blocks if there no additional data available
126// in recv. If Read returns any non-nil error, it will continue to return that error.
127func (r *recvBufferReader) Read(p []byte) (n int, err error) {
128	if r.err != nil {
129		return 0, r.err
130	}
131	n, r.err = r.read(p)
132	return n, r.err
133}
134
135func (r *recvBufferReader) read(p []byte) (n int, err error) {
136	if r.last != nil && len(r.last) > 0 {
137		// Read remaining data left in last call.
138		copied := copy(p, r.last)
139		r.last = r.last[copied:]
140		return copied, nil
141	}
142	select {
143	case <-r.ctxDone:
144		return 0, ContextErr(r.ctx.Err())
145	case m := <-r.recv.get():
146		r.recv.load()
147		if m.err != nil {
148			return 0, m.err
149		}
150		copied := copy(p, m.data)
151		r.last = m.data[copied:]
152		return copied, nil
153	}
154}
155
156type streamState uint32
157
158const (
159	streamActive    streamState = iota
160	streamWriteDone             // EndStream sent
161	streamReadDone              // EndStream received
162	streamDone                  // the entire stream is finished.
163)
164
165// Stream represents an RPC in the transport layer.
166type Stream struct {
167	id           uint32
168	st           ServerTransport    // nil for client side Stream
169	ctx          context.Context    // the associated context of the stream
170	cancel       context.CancelFunc // always nil for client side Stream
171	done         chan struct{}      // closed at the end of stream to unblock writers. On the client side.
172	ctxDone      <-chan struct{}    // same as done chan but for server side. Cache of ctx.Done() (for performance)
173	method       string             // the associated RPC method of the stream
174	recvCompress string
175	sendCompress string
176	buf          *recvBuffer
177	trReader     io.Reader
178	fc           *inFlow
179	recvQuota    uint32
180	wq           *writeQuota
181
182	// Callback to state application's intentions to read data. This
183	// is used to adjust flow control, if needed.
184	requestRead func(int)
185
186	headerChan chan struct{} // closed to indicate the end of header metadata.
187	headerDone uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.
188	header     metadata.MD   // the received header metadata.
189	trailer    metadata.MD   // the key-value map of trailer metadata.
190
191	headerOk bool // becomes true from the first header is about to send
192	state    streamState
193
194	status *status.Status // the status error received from the server
195
196	bytesReceived uint32 // indicates whether any bytes have been received on this stream
197	unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream
198
199	// contentSubtype is the content-subtype for requests.
200	// this must be lowercase or the behavior is undefined.
201	contentSubtype string
202}
203
204func (s *Stream) swapState(st streamState) streamState {
205	return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
206}
207
208func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
209	return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
210}
211
212func (s *Stream) getState() streamState {
213	return streamState(atomic.LoadUint32((*uint32)(&s.state)))
214}
215
216func (s *Stream) waitOnHeader() error {
217	if s.headerChan == nil {
218		// On the server headerChan is always nil since a stream originates
219		// only after having received headers.
220		return nil
221	}
222	select {
223	case <-s.ctx.Done():
224		return ContextErr(s.ctx.Err())
225	case <-s.headerChan:
226		return nil
227	}
228}
229
230// RecvCompress returns the compression algorithm applied to the inbound
231// message. It is empty string if there is no compression applied.
232func (s *Stream) RecvCompress() string {
233	if err := s.waitOnHeader(); err != nil {
234		return ""
235	}
236	return s.recvCompress
237}
238
239// SetSendCompress sets the compression algorithm to the stream.
240func (s *Stream) SetSendCompress(str string) {
241	s.sendCompress = str
242}
243
244// Done returns a chanel which is closed when it receives the final status
245// from the server.
246func (s *Stream) Done() <-chan struct{} {
247	return s.done
248}
249
250// Header acquires the key-value pairs of header metadata once it
251// is available. It blocks until i) the metadata is ready or ii) there is no
252// header metadata or iii) the stream is canceled/expired.
253func (s *Stream) Header() (metadata.MD, error) {
254	err := s.waitOnHeader()
255	// Even if the stream is closed, header is returned if available.
256	select {
257	case <-s.headerChan:
258		if s.header == nil {
259			return nil, nil
260		}
261		return s.header.Copy(), nil
262	default:
263	}
264	return nil, err
265}
266
267// Trailer returns the cached trailer metedata. Note that if it is not called
268// after the entire stream is done, it could return an empty MD. Client
269// side only.
270// It can be safely read only after stream has ended that is either read
271// or write have returned io.EOF.
272func (s *Stream) Trailer() metadata.MD {
273	c := s.trailer.Copy()
274	return c
275}
276
277// ServerTransport returns the underlying ServerTransport for the stream.
278// The client side stream always returns nil.
279func (s *Stream) ServerTransport() ServerTransport {
280	return s.st
281}
282
283// ContentSubtype returns the content-subtype for a request. For example, a
284// content-subtype of "proto" will result in a content-type of
285// "application/grpc+proto". This will always be lowercase.  See
286// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
287// more details.
288func (s *Stream) ContentSubtype() string {
289	return s.contentSubtype
290}
291
292// Context returns the context of the stream.
293func (s *Stream) Context() context.Context {
294	return s.ctx
295}
296
297// Method returns the method for the stream.
298func (s *Stream) Method() string {
299	return s.method
300}
301
302// Status returns the status received from the server.
303// Status can be read safely only after the stream has ended,
304// that is, read or write has returned io.EOF.
305func (s *Stream) Status() *status.Status {
306	return s.status
307}
308
309// SetHeader sets the header metadata. This can be called multiple times.
310// Server side only.
311// This should not be called in parallel to other data writes.
312func (s *Stream) SetHeader(md metadata.MD) error {
313	if md.Len() == 0 {
314		return nil
315	}
316	if s.headerOk || atomic.LoadUint32((*uint32)(&s.state)) == uint32(streamDone) {
317		return ErrIllegalHeaderWrite
318	}
319	s.header = metadata.Join(s.header, md)
320	return nil
321}
322
323// SendHeader sends the given header metadata. The given metadata is
324// combined with any metadata set by previous calls to SetHeader and
325// then written to the transport stream.
326func (s *Stream) SendHeader(md metadata.MD) error {
327	t := s.ServerTransport()
328	return t.WriteHeader(s, md)
329}
330
331// SetTrailer sets the trailer metadata which will be sent with the RPC status
332// by the server. This can be called multiple times. Server side only.
333// This should not be called parallel to other data writes.
334func (s *Stream) SetTrailer(md metadata.MD) error {
335	if md.Len() == 0 {
336		return nil
337	}
338	s.trailer = metadata.Join(s.trailer, md)
339	return nil
340}
341
342func (s *Stream) write(m recvMsg) {
343	s.buf.put(m)
344}
345
346// Read reads all p bytes from the wire for this stream.
347func (s *Stream) Read(p []byte) (n int, err error) {
348	// Don't request a read if there was an error earlier
349	if er := s.trReader.(*transportReader).er; er != nil {
350		return 0, er
351	}
352	s.requestRead(len(p))
353	return io.ReadFull(s.trReader, p)
354}
355
356// tranportReader reads all the data available for this Stream from the transport and
357// passes them into the decoder, which converts them into a gRPC message stream.
358// The error is io.EOF when the stream is done or another non-nil error if
359// the stream broke.
360type transportReader struct {
361	reader io.Reader
362	// The handler to control the window update procedure for both this
363	// particular stream and the associated transport.
364	windowHandler func(int)
365	er            error
366}
367
368func (t *transportReader) Read(p []byte) (n int, err error) {
369	n, err = t.reader.Read(p)
370	if err != nil {
371		t.er = err
372		return
373	}
374	t.windowHandler(n)
375	return
376}
377
378// BytesReceived indicates whether any bytes have been received on this stream.
379func (s *Stream) BytesReceived() bool {
380	return atomic.LoadUint32(&s.bytesReceived) == 1
381}
382
383// Unprocessed indicates whether the server did not process this stream --
384// i.e. it sent a refused stream or GOAWAY including this stream ID.
385func (s *Stream) Unprocessed() bool {
386	return atomic.LoadUint32(&s.unprocessed) == 1
387}
388
389// GoString is implemented by Stream so context.String() won't
390// race when printing %#v.
391func (s *Stream) GoString() string {
392	return fmt.Sprintf("<stream: %p, %v>", s, s.method)
393}
394
395// state of transport
396type transportState int
397
398const (
399	reachable transportState = iota
400	closing
401	draining
402)
403
404// ServerConfig consists of all the configurations to establish a server transport.
405type ServerConfig struct {
406	MaxStreams            uint32
407	AuthInfo              credentials.AuthInfo
408	InTapHandle           tap.ServerInHandle
409	StatsHandler          stats.Handler
410	KeepaliveParams       keepalive.ServerParameters
411	KeepalivePolicy       keepalive.EnforcementPolicy
412	InitialWindowSize     int32
413	InitialConnWindowSize int32
414	WriteBufferSize       int
415	ReadBufferSize        int
416	ChannelzParentID      int64
417}
418
419// NewServerTransport creates a ServerTransport with conn or non-nil error
420// if it fails.
421func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
422	return newHTTP2Server(conn, config)
423}
424
425// ConnectOptions covers all relevant options for communicating with the server.
426type ConnectOptions struct {
427	// UserAgent is the application user agent.
428	UserAgent string
429	// Authority is the :authority pseudo-header to use. This field has no effect if
430	// TransportCredentials is set.
431	Authority string
432	// Dialer specifies how to dial a network address.
433	Dialer func(context.Context, string) (net.Conn, error)
434	// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
435	FailOnNonTempDialError bool
436	// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
437	PerRPCCredentials []credentials.PerRPCCredentials
438	// TransportCredentials stores the Authenticator required to setup a client connection.
439	TransportCredentials credentials.TransportCredentials
440	// KeepaliveParams stores the keepalive parameters.
441	KeepaliveParams keepalive.ClientParameters
442	// StatsHandler stores the handler for stats.
443	StatsHandler stats.Handler
444	// InitialWindowSize sets the initial window size for a stream.
445	InitialWindowSize int32
446	// InitialConnWindowSize sets the initial window size for a connection.
447	InitialConnWindowSize int32
448	// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
449	WriteBufferSize int
450	// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
451	ReadBufferSize int
452	// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
453	ChannelzParentID int64
454}
455
456// TargetInfo contains the information of the target such as network address and metadata.
457type TargetInfo struct {
458	Addr      string
459	Metadata  interface{}
460	Authority string
461}
462
463// NewClientTransport establishes the transport with the required ConnectOptions
464// and returns it to the caller.
465func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) {
466	return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess)
467}
468
469// Options provides additional hints and information for message
470// transmission.
471type Options struct {
472	// Last indicates whether this write is the last piece for
473	// this stream.
474	Last bool
475
476	// Delay is a hint to the transport implementation for whether
477	// the data could be buffered for a batching write. The
478	// transport implementation may ignore the hint.
479	Delay bool
480}
481
482// CallHdr carries the information of a particular RPC.
483type CallHdr struct {
484	// Host specifies the peer's host.
485	Host string
486
487	// Method specifies the operation to perform.
488	Method string
489
490	// SendCompress specifies the compression algorithm applied on
491	// outbound message.
492	SendCompress string
493
494	// Creds specifies credentials.PerRPCCredentials for a call.
495	Creds credentials.PerRPCCredentials
496
497	// Flush indicates whether a new stream command should be sent
498	// to the peer without waiting for the first data. This is
499	// only a hint.
500	// If it's true, the transport may modify the flush decision
501	// for performance purposes.
502	// If it's false, new stream will never be flushed.
503	Flush bool
504
505	// ContentSubtype specifies the content-subtype for a request. For example, a
506	// content-subtype of "proto" will result in a content-type of
507	// "application/grpc+proto". The value of ContentSubtype must be all
508	// lowercase, otherwise the behavior is undefined. See
509	// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
510	// for more details.
511	ContentSubtype string
512}
513
514// ClientTransport is the common interface for all gRPC client-side transport
515// implementations.
516type ClientTransport interface {
517	// Close tears down this transport. Once it returns, the transport
518	// should not be accessed any more. The caller must make sure this
519	// is called only once.
520	Close() error
521
522	// GracefulClose starts to tear down the transport. It stops accepting
523	// new RPCs and wait the completion of the pending RPCs.
524	GracefulClose() error
525
526	// Write sends the data for the given stream. A nil stream indicates
527	// the write is to be performed on the transport as a whole.
528	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
529
530	// NewStream creates a Stream for an RPC.
531	NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
532
533	// CloseStream clears the footprint of a stream when the stream is
534	// not needed any more. The err indicates the error incurred when
535	// CloseStream is called. Must be called when a stream is finished
536	// unless the associated transport is closing.
537	CloseStream(stream *Stream, err error)
538
539	// Error returns a channel that is closed when some I/O error
540	// happens. Typically the caller should have a goroutine to monitor
541	// this in order to take action (e.g., close the current transport
542	// and create a new one) in error case. It should not return nil
543	// once the transport is initiated.
544	Error() <-chan struct{}
545
546	// GoAway returns a channel that is closed when ClientTransport
547	// receives the draining signal from the server (e.g., GOAWAY frame in
548	// HTTP/2).
549	GoAway() <-chan struct{}
550
551	// GetGoAwayReason returns the reason why GoAway frame was received.
552	GetGoAwayReason() GoAwayReason
553
554	// IncrMsgSent increments the number of message sent through this transport.
555	IncrMsgSent()
556
557	// IncrMsgRecv increments the number of message received through this transport.
558	IncrMsgRecv()
559}
560
561// ServerTransport is the common interface for all gRPC server-side transport
562// implementations.
563//
564// Methods may be called concurrently from multiple goroutines, but
565// Write methods for a given Stream will be called serially.
566type ServerTransport interface {
567	// HandleStreams receives incoming streams using the given handler.
568	HandleStreams(func(*Stream), func(context.Context, string) context.Context)
569
570	// WriteHeader sends the header metadata for the given stream.
571	// WriteHeader may not be called on all streams.
572	WriteHeader(s *Stream, md metadata.MD) error
573
574	// Write sends the data for the given stream.
575	// Write may not be called on all streams.
576	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
577
578	// WriteStatus sends the status of a stream to the client.  WriteStatus is
579	// the final call made on a stream and always occurs.
580	WriteStatus(s *Stream, st *status.Status) error
581
582	// Close tears down the transport. Once it is called, the transport
583	// should not be accessed any more. All the pending streams and their
584	// handlers will be terminated asynchronously.
585	Close() error
586
587	// RemoteAddr returns the remote network address.
588	RemoteAddr() net.Addr
589
590	// Drain notifies the client this ServerTransport stops accepting new RPCs.
591	Drain()
592
593	// IncrMsgSent increments the number of message sent through this transport.
594	IncrMsgSent()
595
596	// IncrMsgRecv increments the number of message received through this transport.
597	IncrMsgRecv()
598}
599
600// streamErrorf creates an StreamError with the specified error code and description.
601func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
602	return StreamError{
603		Code: c,
604		Desc: fmt.Sprintf(format, a...),
605	}
606}
607
608// connectionErrorf creates an ConnectionError with the specified error description.
609func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
610	return ConnectionError{
611		Desc: fmt.Sprintf(format, a...),
612		temp: temp,
613		err:  e,
614	}
615}
616
617// ConnectionError is an error that results in the termination of the
618// entire connection and the retry of all the active streams.
619type ConnectionError struct {
620	Desc string
621	temp bool
622	err  error
623}
624
625func (e ConnectionError) Error() string {
626	return fmt.Sprintf("connection error: desc = %q", e.Desc)
627}
628
629// Temporary indicates if this connection error is temporary or fatal.
630func (e ConnectionError) Temporary() bool {
631	return e.temp
632}
633
634// Origin returns the original error of this connection error.
635func (e ConnectionError) Origin() error {
636	// Never return nil error here.
637	// If the original error is nil, return itself.
638	if e.err == nil {
639		return e
640	}
641	return e.err
642}
643
644var (
645	// ErrConnClosing indicates that the transport is closing.
646	ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
647	// errStreamDrain indicates that the stream is rejected because the
648	// connection is draining. This could be caused by goaway or balancer
649	// removing the address.
650	errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining")
651	// errStreamDone is returned from write at the client side to indiacte application
652	// layer of an error.
653	errStreamDone = errors.New("the stream is done")
654	// StatusGoAway indicates that the server sent a GOAWAY that included this
655	// stream's ID in unprocessed RPCs.
656	statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
657)
658
659// TODO: See if we can replace StreamError with status package errors.
660
661// StreamError is an error that only affects one stream within a connection.
662type StreamError struct {
663	Code codes.Code
664	Desc string
665}
666
667func (e StreamError) Error() string {
668	return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
669}
670
671// GoAwayReason contains the reason for the GoAway frame received.
672type GoAwayReason uint8
673
674const (
675	// GoAwayInvalid indicates that no GoAway frame is received.
676	GoAwayInvalid GoAwayReason = 0
677	// GoAwayNoReason is the default value when GoAway frame is received.
678	GoAwayNoReason GoAwayReason = 1
679	// GoAwayTooManyPings indicates that a GoAway frame with
680	// ErrCodeEnhanceYourCalm was received and that the debug data said
681	// "too_many_pings".
682	GoAwayTooManyPings GoAwayReason = 2
683)
684