1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package transport defines and implements message oriented communication 20// channel to complete various transactions (e.g., an RPC). It is meant for 21// grpc-internal usage and is not intended to be imported directly by users. 22package transport 23 24import ( 25 "bytes" 26 "context" 27 "errors" 28 "fmt" 29 "io" 30 "net" 31 "sync" 32 "sync/atomic" 33 34 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/credentials" 36 "google.golang.org/grpc/keepalive" 37 "google.golang.org/grpc/metadata" 38 "google.golang.org/grpc/resolver" 39 "google.golang.org/grpc/stats" 40 "google.golang.org/grpc/status" 41 "google.golang.org/grpc/tap" 42) 43 44const logLevel = 2 45 46type bufferPool struct { 47 pool sync.Pool 48} 49 50func newBufferPool() *bufferPool { 51 return &bufferPool{ 52 pool: sync.Pool{ 53 New: func() interface{} { 54 return new(bytes.Buffer) 55 }, 56 }, 57 } 58} 59 60func (p *bufferPool) get() *bytes.Buffer { 61 return p.pool.Get().(*bytes.Buffer) 62} 63 64func (p *bufferPool) put(b *bytes.Buffer) { 65 p.pool.Put(b) 66} 67 68// recvMsg represents the received msg from the transport. All transport 69// protocol specific info has been removed. 70type recvMsg struct { 71 buffer *bytes.Buffer 72 // nil: received some data 73 // io.EOF: stream is completed. data is nil. 74 // other non-nil error: transport failure. data is nil. 75 err error 76} 77 78// recvBuffer is an unbounded channel of recvMsg structs. 79// 80// Note: recvBuffer differs from buffer.Unbounded only in the fact that it 81// holds a channel of recvMsg structs instead of objects implementing "item" 82// interface. recvBuffer is written to much more often and using strict recvMsg 83// structs helps avoid allocation in "recvBuffer.put" 84type recvBuffer struct { 85 c chan recvMsg 86 mu sync.Mutex 87 backlog []recvMsg 88 err error 89} 90 91func newRecvBuffer() *recvBuffer { 92 b := &recvBuffer{ 93 c: make(chan recvMsg, 1), 94 } 95 return b 96} 97 98func (b *recvBuffer) put(r recvMsg) { 99 b.mu.Lock() 100 if b.err != nil { 101 b.mu.Unlock() 102 // An error had occurred earlier, don't accept more 103 // data or errors. 104 return 105 } 106 b.err = r.err 107 if len(b.backlog) == 0 { 108 select { 109 case b.c <- r: 110 b.mu.Unlock() 111 return 112 default: 113 } 114 } 115 b.backlog = append(b.backlog, r) 116 b.mu.Unlock() 117} 118 119func (b *recvBuffer) load() { 120 b.mu.Lock() 121 if len(b.backlog) > 0 { 122 select { 123 case b.c <- b.backlog[0]: 124 b.backlog[0] = recvMsg{} 125 b.backlog = b.backlog[1:] 126 default: 127 } 128 } 129 b.mu.Unlock() 130} 131 132// get returns the channel that receives a recvMsg in the buffer. 133// 134// Upon receipt of a recvMsg, the caller should call load to send another 135// recvMsg onto the channel if there is any. 136func (b *recvBuffer) get() <-chan recvMsg { 137 return b.c 138} 139 140// recvBufferReader implements io.Reader interface to read the data from 141// recvBuffer. 142type recvBufferReader struct { 143 closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata. 144 ctx context.Context 145 ctxDone <-chan struct{} // cache of ctx.Done() (for performance). 146 recv *recvBuffer 147 last *bytes.Buffer // Stores the remaining data in the previous calls. 148 err error 149 freeBuffer func(*bytes.Buffer) 150} 151 152// Read reads the next len(p) bytes from last. If last is drained, it tries to 153// read additional data from recv. It blocks if there no additional data available 154// in recv. If Read returns any non-nil error, it will continue to return that error. 155func (r *recvBufferReader) Read(p []byte) (n int, err error) { 156 if r.err != nil { 157 return 0, r.err 158 } 159 if r.last != nil { 160 // Read remaining data left in last call. 161 copied, _ := r.last.Read(p) 162 if r.last.Len() == 0 { 163 r.freeBuffer(r.last) 164 r.last = nil 165 } 166 return copied, nil 167 } 168 if r.closeStream != nil { 169 n, r.err = r.readClient(p) 170 } else { 171 n, r.err = r.read(p) 172 } 173 return n, r.err 174} 175 176func (r *recvBufferReader) read(p []byte) (n int, err error) { 177 select { 178 case <-r.ctxDone: 179 return 0, ContextErr(r.ctx.Err()) 180 case m := <-r.recv.get(): 181 return r.readAdditional(m, p) 182 } 183} 184 185func (r *recvBufferReader) readClient(p []byte) (n int, err error) { 186 // If the context is canceled, then closes the stream with nil metadata. 187 // closeStream writes its error parameter to r.recv as a recvMsg. 188 // r.readAdditional acts on that message and returns the necessary error. 189 select { 190 case <-r.ctxDone: 191 // Note that this adds the ctx error to the end of recv buffer, and 192 // reads from the head. This will delay the error until recv buffer is 193 // empty, thus will delay ctx cancellation in Recv(). 194 // 195 // It's done this way to fix a race between ctx cancel and trailer. The 196 // race was, stream.Recv() may return ctx error if ctxDone wins the 197 // race, but stream.Trailer() may return a non-nil md because the stream 198 // was not marked as done when trailer is received. This closeStream 199 // call will mark stream as done, thus fix the race. 200 // 201 // TODO: delaying ctx error seems like a unnecessary side effect. What 202 // we really want is to mark the stream as done, and return ctx error 203 // faster. 204 r.closeStream(ContextErr(r.ctx.Err())) 205 m := <-r.recv.get() 206 return r.readAdditional(m, p) 207 case m := <-r.recv.get(): 208 return r.readAdditional(m, p) 209 } 210} 211 212func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) { 213 r.recv.load() 214 if m.err != nil { 215 return 0, m.err 216 } 217 copied, _ := m.buffer.Read(p) 218 if m.buffer.Len() == 0 { 219 r.freeBuffer(m.buffer) 220 r.last = nil 221 } else { 222 r.last = m.buffer 223 } 224 return copied, nil 225} 226 227type streamState uint32 228 229const ( 230 streamActive streamState = iota 231 streamWriteDone // EndStream sent 232 streamReadDone // EndStream received 233 streamDone // the entire stream is finished. 234) 235 236// Stream represents an RPC in the transport layer. 237type Stream struct { 238 id uint32 239 st ServerTransport // nil for client side Stream 240 ct *http2Client // nil for server side Stream 241 ctx context.Context // the associated context of the stream 242 cancel context.CancelFunc // always nil for client side Stream 243 done chan struct{} // closed at the end of stream to unblock writers. On the client side. 244 doneFunc func() // invoked at the end of stream on client side. 245 ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance) 246 method string // the associated RPC method of the stream 247 recvCompress string 248 sendCompress string 249 buf *recvBuffer 250 trReader io.Reader 251 fc *inFlow 252 wq *writeQuota 253 254 // Callback to state application's intentions to read data. This 255 // is used to adjust flow control, if needed. 256 requestRead func(int) 257 258 headerChan chan struct{} // closed to indicate the end of header metadata. 259 headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. 260 // headerValid indicates whether a valid header was received. Only 261 // meaningful after headerChan is closed (always call waitOnHeader() before 262 // reading its value). Not valid on server side. 263 headerValid bool 264 265 // hdrMu protects header and trailer metadata on the server-side. 266 hdrMu sync.Mutex 267 // On client side, header keeps the received header metadata. 268 // 269 // On server side, header keeps the header set by SetHeader(). The complete 270 // header will merged into this after t.WriteHeader() is called. 271 header metadata.MD 272 trailer metadata.MD // the key-value map of trailer metadata. 273 274 noHeaders bool // set if the client never received headers (set only after the stream is done). 275 276 // On the server-side, headerSent is atomically set to 1 when the headers are sent out. 277 headerSent uint32 278 279 state streamState 280 281 // On client-side it is the status error received from the server. 282 // On server-side it is unused. 283 status *status.Status 284 285 bytesReceived uint32 // indicates whether any bytes have been received on this stream 286 unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream 287 288 // contentSubtype is the content-subtype for requests. 289 // this must be lowercase or the behavior is undefined. 290 contentSubtype string 291} 292 293// isHeaderSent is only valid on the server-side. 294func (s *Stream) isHeaderSent() bool { 295 return atomic.LoadUint32(&s.headerSent) == 1 296} 297 298// updateHeaderSent updates headerSent and returns true 299// if it was alreay set. It is valid only on server-side. 300func (s *Stream) updateHeaderSent() bool { 301 return atomic.SwapUint32(&s.headerSent, 1) == 1 302} 303 304func (s *Stream) swapState(st streamState) streamState { 305 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st))) 306} 307 308func (s *Stream) compareAndSwapState(oldState, newState streamState) bool { 309 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState)) 310} 311 312func (s *Stream) getState() streamState { 313 return streamState(atomic.LoadUint32((*uint32)(&s.state))) 314} 315 316func (s *Stream) waitOnHeader() { 317 if s.headerChan == nil { 318 // On the server headerChan is always nil since a stream originates 319 // only after having received headers. 320 return 321 } 322 select { 323 case <-s.ctx.Done(): 324 // Close the stream to prevent headers/trailers from changing after 325 // this function returns. 326 s.ct.CloseStream(s, ContextErr(s.ctx.Err())) 327 // headerChan could possibly not be closed yet if closeStream raced 328 // with operateHeaders; wait until it is closed explicitly here. 329 <-s.headerChan 330 case <-s.headerChan: 331 } 332} 333 334// RecvCompress returns the compression algorithm applied to the inbound 335// message. It is empty string if there is no compression applied. 336func (s *Stream) RecvCompress() string { 337 s.waitOnHeader() 338 return s.recvCompress 339} 340 341// SetSendCompress sets the compression algorithm to the stream. 342func (s *Stream) SetSendCompress(str string) { 343 s.sendCompress = str 344} 345 346// Done returns a channel which is closed when it receives the final status 347// from the server. 348func (s *Stream) Done() <-chan struct{} { 349 return s.done 350} 351 352// Header returns the header metadata of the stream. 353// 354// On client side, it acquires the key-value pairs of header metadata once it is 355// available. It blocks until i) the metadata is ready or ii) there is no header 356// metadata or iii) the stream is canceled/expired. 357// 358// On server side, it returns the out header after t.WriteHeader is called. It 359// does not block and must not be called until after WriteHeader. 360func (s *Stream) Header() (metadata.MD, error) { 361 if s.headerChan == nil { 362 // On server side, return the header in stream. It will be the out 363 // header after t.WriteHeader is called. 364 return s.header.Copy(), nil 365 } 366 s.waitOnHeader() 367 if !s.headerValid { 368 return nil, s.status.Err() 369 } 370 return s.header.Copy(), nil 371} 372 373// TrailersOnly blocks until a header or trailers-only frame is received and 374// then returns true if the stream was trailers-only. If the stream ends 375// before headers are received, returns true, nil. Client-side only. 376func (s *Stream) TrailersOnly() bool { 377 s.waitOnHeader() 378 return s.noHeaders 379} 380 381// Trailer returns the cached trailer metedata. Note that if it is not called 382// after the entire stream is done, it could return an empty MD. Client 383// side only. 384// It can be safely read only after stream has ended that is either read 385// or write have returned io.EOF. 386func (s *Stream) Trailer() metadata.MD { 387 c := s.trailer.Copy() 388 return c 389} 390 391// ContentSubtype returns the content-subtype for a request. For example, a 392// content-subtype of "proto" will result in a content-type of 393// "application/grpc+proto". This will always be lowercase. See 394// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for 395// more details. 396func (s *Stream) ContentSubtype() string { 397 return s.contentSubtype 398} 399 400// Context returns the context of the stream. 401func (s *Stream) Context() context.Context { 402 return s.ctx 403} 404 405// Method returns the method for the stream. 406func (s *Stream) Method() string { 407 return s.method 408} 409 410// Status returns the status received from the server. 411// Status can be read safely only after the stream has ended, 412// that is, after Done() is closed. 413func (s *Stream) Status() *status.Status { 414 return s.status 415} 416 417// SetHeader sets the header metadata. This can be called multiple times. 418// Server side only. 419// This should not be called in parallel to other data writes. 420func (s *Stream) SetHeader(md metadata.MD) error { 421 if md.Len() == 0 { 422 return nil 423 } 424 if s.isHeaderSent() || s.getState() == streamDone { 425 return ErrIllegalHeaderWrite 426 } 427 s.hdrMu.Lock() 428 s.header = metadata.Join(s.header, md) 429 s.hdrMu.Unlock() 430 return nil 431} 432 433// SendHeader sends the given header metadata. The given metadata is 434// combined with any metadata set by previous calls to SetHeader and 435// then written to the transport stream. 436func (s *Stream) SendHeader(md metadata.MD) error { 437 return s.st.WriteHeader(s, md) 438} 439 440// SetTrailer sets the trailer metadata which will be sent with the RPC status 441// by the server. This can be called multiple times. Server side only. 442// This should not be called parallel to other data writes. 443func (s *Stream) SetTrailer(md metadata.MD) error { 444 if md.Len() == 0 { 445 return nil 446 } 447 if s.getState() == streamDone { 448 return ErrIllegalHeaderWrite 449 } 450 s.hdrMu.Lock() 451 s.trailer = metadata.Join(s.trailer, md) 452 s.hdrMu.Unlock() 453 return nil 454} 455 456func (s *Stream) write(m recvMsg) { 457 s.buf.put(m) 458} 459 460// Read reads all p bytes from the wire for this stream. 461func (s *Stream) Read(p []byte) (n int, err error) { 462 // Don't request a read if there was an error earlier 463 if er := s.trReader.(*transportReader).er; er != nil { 464 return 0, er 465 } 466 s.requestRead(len(p)) 467 return io.ReadFull(s.trReader, p) 468} 469 470// tranportReader reads all the data available for this Stream from the transport and 471// passes them into the decoder, which converts them into a gRPC message stream. 472// The error is io.EOF when the stream is done or another non-nil error if 473// the stream broke. 474type transportReader struct { 475 reader io.Reader 476 // The handler to control the window update procedure for both this 477 // particular stream and the associated transport. 478 windowHandler func(int) 479 er error 480} 481 482func (t *transportReader) Read(p []byte) (n int, err error) { 483 n, err = t.reader.Read(p) 484 if err != nil { 485 t.er = err 486 return 487 } 488 t.windowHandler(n) 489 return 490} 491 492// BytesReceived indicates whether any bytes have been received on this stream. 493func (s *Stream) BytesReceived() bool { 494 return atomic.LoadUint32(&s.bytesReceived) == 1 495} 496 497// Unprocessed indicates whether the server did not process this stream -- 498// i.e. it sent a refused stream or GOAWAY including this stream ID. 499func (s *Stream) Unprocessed() bool { 500 return atomic.LoadUint32(&s.unprocessed) == 1 501} 502 503// GoString is implemented by Stream so context.String() won't 504// race when printing %#v. 505func (s *Stream) GoString() string { 506 return fmt.Sprintf("<stream: %p, %v>", s, s.method) 507} 508 509// state of transport 510type transportState int 511 512const ( 513 reachable transportState = iota 514 closing 515 draining 516) 517 518// ServerConfig consists of all the configurations to establish a server transport. 519type ServerConfig struct { 520 MaxStreams uint32 521 AuthInfo credentials.AuthInfo 522 InTapHandle tap.ServerInHandle 523 StatsHandler stats.Handler 524 KeepaliveParams keepalive.ServerParameters 525 KeepalivePolicy keepalive.EnforcementPolicy 526 InitialWindowSize int32 527 InitialConnWindowSize int32 528 WriteBufferSize int 529 ReadBufferSize int 530 ChannelzParentID int64 531 MaxHeaderListSize *uint32 532 HeaderTableSize *uint32 533} 534 535// NewServerTransport creates a ServerTransport with conn or non-nil error 536// if it fails. 537func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) { 538 return newHTTP2Server(conn, config) 539} 540 541// ConnectOptions covers all relevant options for communicating with the server. 542type ConnectOptions struct { 543 // UserAgent is the application user agent. 544 UserAgent string 545 // Dialer specifies how to dial a network address. 546 Dialer func(context.Context, string) (net.Conn, error) 547 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors. 548 FailOnNonTempDialError bool 549 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs. 550 PerRPCCredentials []credentials.PerRPCCredentials 551 // TransportCredentials stores the Authenticator required to setup a client 552 // connection. Only one of TransportCredentials and CredsBundle is non-nil. 553 TransportCredentials credentials.TransportCredentials 554 // CredsBundle is the credentials bundle to be used. Only one of 555 // TransportCredentials and CredsBundle is non-nil. 556 CredsBundle credentials.Bundle 557 // KeepaliveParams stores the keepalive parameters. 558 KeepaliveParams keepalive.ClientParameters 559 // StatsHandler stores the handler for stats. 560 StatsHandler stats.Handler 561 // InitialWindowSize sets the initial window size for a stream. 562 InitialWindowSize int32 563 // InitialConnWindowSize sets the initial window size for a connection. 564 InitialConnWindowSize int32 565 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire. 566 WriteBufferSize int 567 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall. 568 ReadBufferSize int 569 // ChannelzParentID sets the addrConn id which initiate the creation of this client transport. 570 ChannelzParentID int64 571 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received. 572 MaxHeaderListSize *uint32 573 // UseProxy specifies if a proxy should be used. 574 UseProxy bool 575} 576 577// NewClientTransport establishes the transport with the required ConnectOptions 578// and returns it to the caller. 579func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) { 580 return newHTTP2Client(connectCtx, ctx, addr, opts, onPrefaceReceipt, onGoAway, onClose) 581} 582 583// Options provides additional hints and information for message 584// transmission. 585type Options struct { 586 // Last indicates whether this write is the last piece for 587 // this stream. 588 Last bool 589} 590 591// CallHdr carries the information of a particular RPC. 592type CallHdr struct { 593 // Host specifies the peer's host. 594 Host string 595 596 // Method specifies the operation to perform. 597 Method string 598 599 // SendCompress specifies the compression algorithm applied on 600 // outbound message. 601 SendCompress string 602 603 // Creds specifies credentials.PerRPCCredentials for a call. 604 Creds credentials.PerRPCCredentials 605 606 // ContentSubtype specifies the content-subtype for a request. For example, a 607 // content-subtype of "proto" will result in a content-type of 608 // "application/grpc+proto". The value of ContentSubtype must be all 609 // lowercase, otherwise the behavior is undefined. See 610 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests 611 // for more details. 612 ContentSubtype string 613 614 PreviousAttempts int // value of grpc-previous-rpc-attempts header to set 615 616 DoneFunc func() // called when the stream is finished 617} 618 619// ClientTransport is the common interface for all gRPC client-side transport 620// implementations. 621type ClientTransport interface { 622 // Close tears down this transport. Once it returns, the transport 623 // should not be accessed any more. The caller must make sure this 624 // is called only once. 625 Close(err error) 626 627 // GracefulClose starts to tear down the transport: the transport will stop 628 // accepting new RPCs and NewStream will return error. Once all streams are 629 // finished, the transport will close. 630 // 631 // It does not block. 632 GracefulClose() 633 634 // Write sends the data for the given stream. A nil stream indicates 635 // the write is to be performed on the transport as a whole. 636 Write(s *Stream, hdr []byte, data []byte, opts *Options) error 637 638 // NewStream creates a Stream for an RPC. 639 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) 640 641 // CloseStream clears the footprint of a stream when the stream is 642 // not needed any more. The err indicates the error incurred when 643 // CloseStream is called. Must be called when a stream is finished 644 // unless the associated transport is closing. 645 CloseStream(stream *Stream, err error) 646 647 // Error returns a channel that is closed when some I/O error 648 // happens. Typically the caller should have a goroutine to monitor 649 // this in order to take action (e.g., close the current transport 650 // and create a new one) in error case. It should not return nil 651 // once the transport is initiated. 652 Error() <-chan struct{} 653 654 // GoAway returns a channel that is closed when ClientTransport 655 // receives the draining signal from the server (e.g., GOAWAY frame in 656 // HTTP/2). 657 GoAway() <-chan struct{} 658 659 // GetGoAwayReason returns the reason why GoAway frame was received, along 660 // with a human readable string with debug info. 661 GetGoAwayReason() (GoAwayReason, string) 662 663 // RemoteAddr returns the remote network address. 664 RemoteAddr() net.Addr 665 666 // IncrMsgSent increments the number of message sent through this transport. 667 IncrMsgSent() 668 669 // IncrMsgRecv increments the number of message received through this transport. 670 IncrMsgRecv() 671} 672 673// ServerTransport is the common interface for all gRPC server-side transport 674// implementations. 675// 676// Methods may be called concurrently from multiple goroutines, but 677// Write methods for a given Stream will be called serially. 678type ServerTransport interface { 679 // HandleStreams receives incoming streams using the given handler. 680 HandleStreams(func(*Stream), func(context.Context, string) context.Context) 681 682 // WriteHeader sends the header metadata for the given stream. 683 // WriteHeader may not be called on all streams. 684 WriteHeader(s *Stream, md metadata.MD) error 685 686 // Write sends the data for the given stream. 687 // Write may not be called on all streams. 688 Write(s *Stream, hdr []byte, data []byte, opts *Options) error 689 690 // WriteStatus sends the status of a stream to the client. WriteStatus is 691 // the final call made on a stream and always occurs. 692 WriteStatus(s *Stream, st *status.Status) error 693 694 // Close tears down the transport. Once it is called, the transport 695 // should not be accessed any more. All the pending streams and their 696 // handlers will be terminated asynchronously. 697 Close() error 698 699 // RemoteAddr returns the remote network address. 700 RemoteAddr() net.Addr 701 702 // Drain notifies the client this ServerTransport stops accepting new RPCs. 703 Drain() 704 705 // IncrMsgSent increments the number of message sent through this transport. 706 IncrMsgSent() 707 708 // IncrMsgRecv increments the number of message received through this transport. 709 IncrMsgRecv() 710} 711 712// connectionErrorf creates an ConnectionError with the specified error description. 713func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError { 714 return ConnectionError{ 715 Desc: fmt.Sprintf(format, a...), 716 temp: temp, 717 err: e, 718 } 719} 720 721// ConnectionError is an error that results in the termination of the 722// entire connection and the retry of all the active streams. 723type ConnectionError struct { 724 Desc string 725 temp bool 726 err error 727} 728 729func (e ConnectionError) Error() string { 730 return fmt.Sprintf("connection error: desc = %q", e.Desc) 731} 732 733// Temporary indicates if this connection error is temporary or fatal. 734func (e ConnectionError) Temporary() bool { 735 return e.temp 736} 737 738// Origin returns the original error of this connection error. 739func (e ConnectionError) Origin() error { 740 // Never return nil error here. 741 // If the original error is nil, return itself. 742 if e.err == nil { 743 return e 744 } 745 return e.err 746} 747 748var ( 749 // ErrConnClosing indicates that the transport is closing. 750 ErrConnClosing = connectionErrorf(true, nil, "transport is closing") 751 // errStreamDrain indicates that the stream is rejected because the 752 // connection is draining. This could be caused by goaway or balancer 753 // removing the address. 754 errStreamDrain = status.Error(codes.Unavailable, "the connection is draining") 755 // errStreamDone is returned from write at the client side to indiacte application 756 // layer of an error. 757 errStreamDone = errors.New("the stream is done") 758 // StatusGoAway indicates that the server sent a GOAWAY that included this 759 // stream's ID in unprocessed RPCs. 760 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection") 761) 762 763// GoAwayReason contains the reason for the GoAway frame received. 764type GoAwayReason uint8 765 766const ( 767 // GoAwayInvalid indicates that no GoAway frame is received. 768 GoAwayInvalid GoAwayReason = 0 769 // GoAwayNoReason is the default value when GoAway frame is received. 770 GoAwayNoReason GoAwayReason = 1 771 // GoAwayTooManyPings indicates that a GoAway frame with 772 // ErrCodeEnhanceYourCalm was received and that the debug data said 773 // "too_many_pings". 774 GoAwayTooManyPings GoAwayReason = 2 775) 776 777// channelzData is used to store channelz related data for http2Client and http2Server. 778// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic 779// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment. 780// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment. 781type channelzData struct { 782 kpCount int64 783 // The number of streams that have started, including already finished ones. 784 streamsStarted int64 785 // Client side: The number of streams that have ended successfully by receiving 786 // EoS bit set frame from server. 787 // Server side: The number of streams that have ended successfully by sending 788 // frame with EoS bit set. 789 streamsSucceeded int64 790 streamsFailed int64 791 // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type 792 // instead of time.Time since it's more costly to atomically update time.Time variable than int64 793 // variable. The same goes for lastMsgSentTime and lastMsgRecvTime. 794 lastStreamCreatedTime int64 795 msgSent int64 796 msgRecv int64 797 lastMsgSentTime int64 798 lastMsgRecvTime int64 799} 800 801// ContextErr converts the error from context package into a status error. 802func ContextErr(err error) error { 803 switch err { 804 case context.DeadlineExceeded: 805 return status.Error(codes.DeadlineExceeded, err.Error()) 806 case context.Canceled: 807 return status.Error(codes.Canceled, err.Error()) 808 } 809 return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err) 810} 811