1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package transport defines and implements message oriented communication
20// channel to complete various transactions (e.g., an RPC).  It is meant for
21// grpc-internal usage and is not intended to be imported directly by users.
22package transport
23
24import (
25	"bytes"
26	"context"
27	"errors"
28	"fmt"
29	"io"
30	"net"
31	"sync"
32	"sync/atomic"
33
34	"google.golang.org/grpc/codes"
35	"google.golang.org/grpc/credentials"
36	"google.golang.org/grpc/keepalive"
37	"google.golang.org/grpc/metadata"
38	"google.golang.org/grpc/resolver"
39	"google.golang.org/grpc/stats"
40	"google.golang.org/grpc/status"
41	"google.golang.org/grpc/tap"
42)
43
44const logLevel = 2
45
46type bufferPool struct {
47	pool sync.Pool
48}
49
50func newBufferPool() *bufferPool {
51	return &bufferPool{
52		pool: sync.Pool{
53			New: func() interface{} {
54				return new(bytes.Buffer)
55			},
56		},
57	}
58}
59
60func (p *bufferPool) get() *bytes.Buffer {
61	return p.pool.Get().(*bytes.Buffer)
62}
63
64func (p *bufferPool) put(b *bytes.Buffer) {
65	p.pool.Put(b)
66}
67
68// recvMsg represents the received msg from the transport. All transport
69// protocol specific info has been removed.
70type recvMsg struct {
71	buffer *bytes.Buffer
72	// nil: received some data
73	// io.EOF: stream is completed. data is nil.
74	// other non-nil error: transport failure. data is nil.
75	err error
76}
77
78// recvBuffer is an unbounded channel of recvMsg structs.
79//
80// Note: recvBuffer differs from buffer.Unbounded only in the fact that it
81// holds a channel of recvMsg structs instead of objects implementing "item"
82// interface. recvBuffer is written to much more often and using strict recvMsg
83// structs helps avoid allocation in "recvBuffer.put"
84type recvBuffer struct {
85	c       chan recvMsg
86	mu      sync.Mutex
87	backlog []recvMsg
88	err     error
89}
90
91func newRecvBuffer() *recvBuffer {
92	b := &recvBuffer{
93		c: make(chan recvMsg, 1),
94	}
95	return b
96}
97
98func (b *recvBuffer) put(r recvMsg) {
99	b.mu.Lock()
100	if b.err != nil {
101		b.mu.Unlock()
102		// An error had occurred earlier, don't accept more
103		// data or errors.
104		return
105	}
106	b.err = r.err
107	if len(b.backlog) == 0 {
108		select {
109		case b.c <- r:
110			b.mu.Unlock()
111			return
112		default:
113		}
114	}
115	b.backlog = append(b.backlog, r)
116	b.mu.Unlock()
117}
118
119func (b *recvBuffer) load() {
120	b.mu.Lock()
121	if len(b.backlog) > 0 {
122		select {
123		case b.c <- b.backlog[0]:
124			b.backlog[0] = recvMsg{}
125			b.backlog = b.backlog[1:]
126		default:
127		}
128	}
129	b.mu.Unlock()
130}
131
132// get returns the channel that receives a recvMsg in the buffer.
133//
134// Upon receipt of a recvMsg, the caller should call load to send another
135// recvMsg onto the channel if there is any.
136func (b *recvBuffer) get() <-chan recvMsg {
137	return b.c
138}
139
140// recvBufferReader implements io.Reader interface to read the data from
141// recvBuffer.
142type recvBufferReader struct {
143	closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
144	ctx         context.Context
145	ctxDone     <-chan struct{} // cache of ctx.Done() (for performance).
146	recv        *recvBuffer
147	last        *bytes.Buffer // Stores the remaining data in the previous calls.
148	err         error
149	freeBuffer  func(*bytes.Buffer)
150}
151
152// Read reads the next len(p) bytes from last. If last is drained, it tries to
153// read additional data from recv. It blocks if there no additional data available
154// in recv. If Read returns any non-nil error, it will continue to return that error.
155func (r *recvBufferReader) Read(p []byte) (n int, err error) {
156	if r.err != nil {
157		return 0, r.err
158	}
159	if r.last != nil {
160		// Read remaining data left in last call.
161		copied, _ := r.last.Read(p)
162		if r.last.Len() == 0 {
163			r.freeBuffer(r.last)
164			r.last = nil
165		}
166		return copied, nil
167	}
168	if r.closeStream != nil {
169		n, r.err = r.readClient(p)
170	} else {
171		n, r.err = r.read(p)
172	}
173	return n, r.err
174}
175
176func (r *recvBufferReader) read(p []byte) (n int, err error) {
177	select {
178	case <-r.ctxDone:
179		return 0, ContextErr(r.ctx.Err())
180	case m := <-r.recv.get():
181		return r.readAdditional(m, p)
182	}
183}
184
185func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
186	// If the context is canceled, then closes the stream with nil metadata.
187	// closeStream writes its error parameter to r.recv as a recvMsg.
188	// r.readAdditional acts on that message and returns the necessary error.
189	select {
190	case <-r.ctxDone:
191		// Note that this adds the ctx error to the end of recv buffer, and
192		// reads from the head. This will delay the error until recv buffer is
193		// empty, thus will delay ctx cancellation in Recv().
194		//
195		// It's done this way to fix a race between ctx cancel and trailer. The
196		// race was, stream.Recv() may return ctx error if ctxDone wins the
197		// race, but stream.Trailer() may return a non-nil md because the stream
198		// was not marked as done when trailer is received. This closeStream
199		// call will mark stream as done, thus fix the race.
200		//
201		// TODO: delaying ctx error seems like a unnecessary side effect. What
202		// we really want is to mark the stream as done, and return ctx error
203		// faster.
204		r.closeStream(ContextErr(r.ctx.Err()))
205		m := <-r.recv.get()
206		return r.readAdditional(m, p)
207	case m := <-r.recv.get():
208		return r.readAdditional(m, p)
209	}
210}
211
212func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
213	r.recv.load()
214	if m.err != nil {
215		return 0, m.err
216	}
217	copied, _ := m.buffer.Read(p)
218	if m.buffer.Len() == 0 {
219		r.freeBuffer(m.buffer)
220		r.last = nil
221	} else {
222		r.last = m.buffer
223	}
224	return copied, nil
225}
226
227type streamState uint32
228
229const (
230	streamActive    streamState = iota
231	streamWriteDone             // EndStream sent
232	streamReadDone              // EndStream received
233	streamDone                  // the entire stream is finished.
234)
235
236// Stream represents an RPC in the transport layer.
237type Stream struct {
238	id           uint32
239	st           ServerTransport    // nil for client side Stream
240	ct           *http2Client       // nil for server side Stream
241	ctx          context.Context    // the associated context of the stream
242	cancel       context.CancelFunc // always nil for client side Stream
243	done         chan struct{}      // closed at the end of stream to unblock writers. On the client side.
244	doneFunc     func()             // invoked at the end of stream on client side.
245	ctxDone      <-chan struct{}    // same as done chan but for server side. Cache of ctx.Done() (for performance)
246	method       string             // the associated RPC method of the stream
247	recvCompress string
248	sendCompress string
249	buf          *recvBuffer
250	trReader     io.Reader
251	fc           *inFlow
252	wq           *writeQuota
253
254	// Callback to state application's intentions to read data. This
255	// is used to adjust flow control, if needed.
256	requestRead func(int)
257
258	headerChan       chan struct{} // closed to indicate the end of header metadata.
259	headerChanClosed uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.
260	// headerValid indicates whether a valid header was received.  Only
261	// meaningful after headerChan is closed (always call waitOnHeader() before
262	// reading its value).  Not valid on server side.
263	headerValid bool
264
265	// hdrMu protects header and trailer metadata on the server-side.
266	hdrMu sync.Mutex
267	// On client side, header keeps the received header metadata.
268	//
269	// On server side, header keeps the header set by SetHeader(). The complete
270	// header will merged into this after t.WriteHeader() is called.
271	header  metadata.MD
272	trailer metadata.MD // the key-value map of trailer metadata.
273
274	noHeaders bool // set if the client never received headers (set only after the stream is done).
275
276	// On the server-side, headerSent is atomically set to 1 when the headers are sent out.
277	headerSent uint32
278
279	state streamState
280
281	// On client-side it is the status error received from the server.
282	// On server-side it is unused.
283	status *status.Status
284
285	bytesReceived uint32 // indicates whether any bytes have been received on this stream
286	unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream
287
288	// contentSubtype is the content-subtype for requests.
289	// this must be lowercase or the behavior is undefined.
290	contentSubtype string
291}
292
293// isHeaderSent is only valid on the server-side.
294func (s *Stream) isHeaderSent() bool {
295	return atomic.LoadUint32(&s.headerSent) == 1
296}
297
298// updateHeaderSent updates headerSent and returns true
299// if it was alreay set. It is valid only on server-side.
300func (s *Stream) updateHeaderSent() bool {
301	return atomic.SwapUint32(&s.headerSent, 1) == 1
302}
303
304func (s *Stream) swapState(st streamState) streamState {
305	return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
306}
307
308func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
309	return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
310}
311
312func (s *Stream) getState() streamState {
313	return streamState(atomic.LoadUint32((*uint32)(&s.state)))
314}
315
316func (s *Stream) waitOnHeader() {
317	if s.headerChan == nil {
318		// On the server headerChan is always nil since a stream originates
319		// only after having received headers.
320		return
321	}
322	select {
323	case <-s.ctx.Done():
324		// Close the stream to prevent headers/trailers from changing after
325		// this function returns.
326		s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
327		// headerChan could possibly not be closed yet if closeStream raced
328		// with operateHeaders; wait until it is closed explicitly here.
329		<-s.headerChan
330	case <-s.headerChan:
331	}
332}
333
334// RecvCompress returns the compression algorithm applied to the inbound
335// message. It is empty string if there is no compression applied.
336func (s *Stream) RecvCompress() string {
337	s.waitOnHeader()
338	return s.recvCompress
339}
340
341// SetSendCompress sets the compression algorithm to the stream.
342func (s *Stream) SetSendCompress(str string) {
343	s.sendCompress = str
344}
345
346// Done returns a channel which is closed when it receives the final status
347// from the server.
348func (s *Stream) Done() <-chan struct{} {
349	return s.done
350}
351
352// Header returns the header metadata of the stream.
353//
354// On client side, it acquires the key-value pairs of header metadata once it is
355// available. It blocks until i) the metadata is ready or ii) there is no header
356// metadata or iii) the stream is canceled/expired.
357//
358// On server side, it returns the out header after t.WriteHeader is called.  It
359// does not block and must not be called until after WriteHeader.
360func (s *Stream) Header() (metadata.MD, error) {
361	if s.headerChan == nil {
362		// On server side, return the header in stream. It will be the out
363		// header after t.WriteHeader is called.
364		return s.header.Copy(), nil
365	}
366	s.waitOnHeader()
367	if !s.headerValid {
368		return nil, s.status.Err()
369	}
370	return s.header.Copy(), nil
371}
372
373// TrailersOnly blocks until a header or trailers-only frame is received and
374// then returns true if the stream was trailers-only.  If the stream ends
375// before headers are received, returns true, nil.  Client-side only.
376func (s *Stream) TrailersOnly() bool {
377	s.waitOnHeader()
378	return s.noHeaders
379}
380
381// Trailer returns the cached trailer metedata. Note that if it is not called
382// after the entire stream is done, it could return an empty MD. Client
383// side only.
384// It can be safely read only after stream has ended that is either read
385// or write have returned io.EOF.
386func (s *Stream) Trailer() metadata.MD {
387	c := s.trailer.Copy()
388	return c
389}
390
391// ContentSubtype returns the content-subtype for a request. For example, a
392// content-subtype of "proto" will result in a content-type of
393// "application/grpc+proto". This will always be lowercase.  See
394// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
395// more details.
396func (s *Stream) ContentSubtype() string {
397	return s.contentSubtype
398}
399
400// Context returns the context of the stream.
401func (s *Stream) Context() context.Context {
402	return s.ctx
403}
404
405// Method returns the method for the stream.
406func (s *Stream) Method() string {
407	return s.method
408}
409
410// Status returns the status received from the server.
411// Status can be read safely only after the stream has ended,
412// that is, after Done() is closed.
413func (s *Stream) Status() *status.Status {
414	return s.status
415}
416
417// SetHeader sets the header metadata. This can be called multiple times.
418// Server side only.
419// This should not be called in parallel to other data writes.
420func (s *Stream) SetHeader(md metadata.MD) error {
421	if md.Len() == 0 {
422		return nil
423	}
424	if s.isHeaderSent() || s.getState() == streamDone {
425		return ErrIllegalHeaderWrite
426	}
427	s.hdrMu.Lock()
428	s.header = metadata.Join(s.header, md)
429	s.hdrMu.Unlock()
430	return nil
431}
432
433// SendHeader sends the given header metadata. The given metadata is
434// combined with any metadata set by previous calls to SetHeader and
435// then written to the transport stream.
436func (s *Stream) SendHeader(md metadata.MD) error {
437	return s.st.WriteHeader(s, md)
438}
439
440// SetTrailer sets the trailer metadata which will be sent with the RPC status
441// by the server. This can be called multiple times. Server side only.
442// This should not be called parallel to other data writes.
443func (s *Stream) SetTrailer(md metadata.MD) error {
444	if md.Len() == 0 {
445		return nil
446	}
447	if s.getState() == streamDone {
448		return ErrIllegalHeaderWrite
449	}
450	s.hdrMu.Lock()
451	s.trailer = metadata.Join(s.trailer, md)
452	s.hdrMu.Unlock()
453	return nil
454}
455
456func (s *Stream) write(m recvMsg) {
457	s.buf.put(m)
458}
459
460// Read reads all p bytes from the wire for this stream.
461func (s *Stream) Read(p []byte) (n int, err error) {
462	// Don't request a read if there was an error earlier
463	if er := s.trReader.(*transportReader).er; er != nil {
464		return 0, er
465	}
466	s.requestRead(len(p))
467	return io.ReadFull(s.trReader, p)
468}
469
470// tranportReader reads all the data available for this Stream from the transport and
471// passes them into the decoder, which converts them into a gRPC message stream.
472// The error is io.EOF when the stream is done or another non-nil error if
473// the stream broke.
474type transportReader struct {
475	reader io.Reader
476	// The handler to control the window update procedure for both this
477	// particular stream and the associated transport.
478	windowHandler func(int)
479	er            error
480}
481
482func (t *transportReader) Read(p []byte) (n int, err error) {
483	n, err = t.reader.Read(p)
484	if err != nil {
485		t.er = err
486		return
487	}
488	t.windowHandler(n)
489	return
490}
491
492// BytesReceived indicates whether any bytes have been received on this stream.
493func (s *Stream) BytesReceived() bool {
494	return atomic.LoadUint32(&s.bytesReceived) == 1
495}
496
497// Unprocessed indicates whether the server did not process this stream --
498// i.e. it sent a refused stream or GOAWAY including this stream ID.
499func (s *Stream) Unprocessed() bool {
500	return atomic.LoadUint32(&s.unprocessed) == 1
501}
502
503// GoString is implemented by Stream so context.String() won't
504// race when printing %#v.
505func (s *Stream) GoString() string {
506	return fmt.Sprintf("<stream: %p, %v>", s, s.method)
507}
508
509// state of transport
510type transportState int
511
512const (
513	reachable transportState = iota
514	closing
515	draining
516)
517
518// ServerConfig consists of all the configurations to establish a server transport.
519type ServerConfig struct {
520	MaxStreams            uint32
521	AuthInfo              credentials.AuthInfo
522	InTapHandle           tap.ServerInHandle
523	StatsHandler          stats.Handler
524	KeepaliveParams       keepalive.ServerParameters
525	KeepalivePolicy       keepalive.EnforcementPolicy
526	InitialWindowSize     int32
527	InitialConnWindowSize int32
528	WriteBufferSize       int
529	ReadBufferSize        int
530	ChannelzParentID      int64
531	MaxHeaderListSize     *uint32
532	HeaderTableSize       *uint32
533}
534
535// NewServerTransport creates a ServerTransport with conn or non-nil error
536// if it fails.
537func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
538	return newHTTP2Server(conn, config)
539}
540
541// ConnectOptions covers all relevant options for communicating with the server.
542type ConnectOptions struct {
543	// UserAgent is the application user agent.
544	UserAgent string
545	// Dialer specifies how to dial a network address.
546	Dialer func(context.Context, string) (net.Conn, error)
547	// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
548	FailOnNonTempDialError bool
549	// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
550	PerRPCCredentials []credentials.PerRPCCredentials
551	// TransportCredentials stores the Authenticator required to setup a client
552	// connection. Only one of TransportCredentials and CredsBundle is non-nil.
553	TransportCredentials credentials.TransportCredentials
554	// CredsBundle is the credentials bundle to be used. Only one of
555	// TransportCredentials and CredsBundle is non-nil.
556	CredsBundle credentials.Bundle
557	// KeepaliveParams stores the keepalive parameters.
558	KeepaliveParams keepalive.ClientParameters
559	// StatsHandler stores the handler for stats.
560	StatsHandler stats.Handler
561	// InitialWindowSize sets the initial window size for a stream.
562	InitialWindowSize int32
563	// InitialConnWindowSize sets the initial window size for a connection.
564	InitialConnWindowSize int32
565	// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
566	WriteBufferSize int
567	// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
568	ReadBufferSize int
569	// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
570	ChannelzParentID int64
571	// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
572	MaxHeaderListSize *uint32
573	// UseProxy specifies if a proxy should be used.
574	UseProxy bool
575}
576
577// NewClientTransport establishes the transport with the required ConnectOptions
578// and returns it to the caller.
579func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
580	return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose)
581}
582
583// Options provides additional hints and information for message
584// transmission.
585type Options struct {
586	// Last indicates whether this write is the last piece for
587	// this stream.
588	Last bool
589}
590
591// CallHdr carries the information of a particular RPC.
592type CallHdr struct {
593	// Host specifies the peer's host.
594	Host string
595
596	// Method specifies the operation to perform.
597	Method string
598
599	// SendCompress specifies the compression algorithm applied on
600	// outbound message.
601	SendCompress string
602
603	// Creds specifies credentials.PerRPCCredentials for a call.
604	Creds credentials.PerRPCCredentials
605
606	// ContentSubtype specifies the content-subtype for a request. For example, a
607	// content-subtype of "proto" will result in a content-type of
608	// "application/grpc+proto". The value of ContentSubtype must be all
609	// lowercase, otherwise the behavior is undefined. See
610	// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
611	// for more details.
612	ContentSubtype string
613
614	PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
615
616	DoneFunc func() // called when the stream is finished
617}
618
619// ClientTransport is the common interface for all gRPC client-side transport
620// implementations.
621type ClientTransport interface {
622	// Close tears down this transport. Once it returns, the transport
623	// should not be accessed any more. The caller must make sure this
624	// is called only once.
625	Close(err error)
626
627	// GracefulClose starts to tear down the transport: the transport will stop
628	// accepting new RPCs and NewStream will return error. Once all streams are
629	// finished, the transport will close.
630	//
631	// It does not block.
632	GracefulClose()
633
634	// Write sends the data for the given stream. A nil stream indicates
635	// the write is to be performed on the transport as a whole.
636	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
637
638	// NewStream creates a Stream for an RPC.
639	NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
640
641	// CloseStream clears the footprint of a stream when the stream is
642	// not needed any more. The err indicates the error incurred when
643	// CloseStream is called. Must be called when a stream is finished
644	// unless the associated transport is closing.
645	CloseStream(stream *Stream, err error)
646
647	// Error returns a channel that is closed when some I/O error
648	// happens. Typically the caller should have a goroutine to monitor
649	// this in order to take action (e.g., close the current transport
650	// and create a new one) in error case. It should not return nil
651	// once the transport is initiated.
652	Error() <-chan struct{}
653
654	// GoAway returns a channel that is closed when ClientTransport
655	// receives the draining signal from the server (e.g., GOAWAY frame in
656	// HTTP/2).
657	GoAway() <-chan struct{}
658
659	// GetGoAwayReason returns the reason why GoAway frame was received, along
660	// with a human readable string with debug info.
661	GetGoAwayReason() (GoAwayReason, string)
662
663	// RemoteAddr returns the remote network address.
664	RemoteAddr() net.Addr
665
666	// IncrMsgSent increments the number of message sent through this transport.
667	IncrMsgSent()
668
669	// IncrMsgRecv increments the number of message received through this transport.
670	IncrMsgRecv()
671}
672
673// ServerTransport is the common interface for all gRPC server-side transport
674// implementations.
675//
676// Methods may be called concurrently from multiple goroutines, but
677// Write methods for a given Stream will be called serially.
678type ServerTransport interface {
679	// HandleStreams receives incoming streams using the given handler.
680	HandleStreams(func(*Stream), func(context.Context, string) context.Context)
681
682	// WriteHeader sends the header metadata for the given stream.
683	// WriteHeader may not be called on all streams.
684	WriteHeader(s *Stream, md metadata.MD) error
685
686	// Write sends the data for the given stream.
687	// Write may not be called on all streams.
688	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
689
690	// WriteStatus sends the status of a stream to the client.  WriteStatus is
691	// the final call made on a stream and always occurs.
692	WriteStatus(s *Stream, st *status.Status) error
693
694	// Close tears down the transport. Once it is called, the transport
695	// should not be accessed any more. All the pending streams and their
696	// handlers will be terminated asynchronously.
697	Close() error
698
699	// RemoteAddr returns the remote network address.
700	RemoteAddr() net.Addr
701
702	// Drain notifies the client this ServerTransport stops accepting new RPCs.
703	Drain()
704
705	// IncrMsgSent increments the number of message sent through this transport.
706	IncrMsgSent()
707
708	// IncrMsgRecv increments the number of message received through this transport.
709	IncrMsgRecv()
710}
711
712// connectionErrorf creates an ConnectionError with the specified error description.
713func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
714	return ConnectionError{
715		Desc: fmt.Sprintf(format, a...),
716		temp: temp,
717		err:  e,
718	}
719}
720
721// ConnectionError is an error that results in the termination of the
722// entire connection and the retry of all the active streams.
723type ConnectionError struct {
724	Desc string
725	temp bool
726	err  error
727}
728
729func (e ConnectionError) Error() string {
730	return fmt.Sprintf("connection error: desc = %q", e.Desc)
731}
732
733// Temporary indicates if this connection error is temporary or fatal.
734func (e ConnectionError) Temporary() bool {
735	return e.temp
736}
737
738// Origin returns the original error of this connection error.
739func (e ConnectionError) Origin() error {
740	// Never return nil error here.
741	// If the original error is nil, return itself.
742	if e.err == nil {
743		return e
744	}
745	return e.err
746}
747
748var (
749	// ErrConnClosing indicates that the transport is closing.
750	ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
751	// errStreamDrain indicates that the stream is rejected because the
752	// connection is draining. This could be caused by goaway or balancer
753	// removing the address.
754	errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
755	// errStreamDone is returned from write at the client side to indiacte application
756	// layer of an error.
757	errStreamDone = errors.New("the stream is done")
758	// StatusGoAway indicates that the server sent a GOAWAY that included this
759	// stream's ID in unprocessed RPCs.
760	statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
761)
762
763// GoAwayReason contains the reason for the GoAway frame received.
764type GoAwayReason uint8
765
766const (
767	// GoAwayInvalid indicates that no GoAway frame is received.
768	GoAwayInvalid GoAwayReason = 0
769	// GoAwayNoReason is the default value when GoAway frame is received.
770	GoAwayNoReason GoAwayReason = 1
771	// GoAwayTooManyPings indicates that a GoAway frame with
772	// ErrCodeEnhanceYourCalm was received and that the debug data said
773	// "too_many_pings".
774	GoAwayTooManyPings GoAwayReason = 2
775)
776
777// channelzData is used to store channelz related data for http2Client and http2Server.
778// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
779// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
780// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
781type channelzData struct {
782	kpCount int64
783	// The number of streams that have started, including already finished ones.
784	streamsStarted int64
785	// Client side: The number of streams that have ended successfully by receiving
786	// EoS bit set frame from server.
787	// Server side: The number of streams that have ended successfully by sending
788	// frame with EoS bit set.
789	streamsSucceeded int64
790	streamsFailed    int64
791	// lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
792	// instead of time.Time since it's more costly to atomically update time.Time variable than int64
793	// variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
794	lastStreamCreatedTime int64
795	msgSent               int64
796	msgRecv               int64
797	lastMsgSentTime       int64
798	lastMsgRecvTime       int64
799}
800
801// ContextErr converts the error from context package into a status error.
802func ContextErr(err error) error {
803	switch err {
804	case context.DeadlineExceeded:
805		return status.Error(codes.DeadlineExceeded, err.Error())
806	case context.Canceled:
807		return status.Error(codes.Canceled, err.Error())
808	}
809	return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
810}
811