1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package transport defines and implements message oriented communication 20// channel to complete various transactions (e.g., an RPC). It is meant for 21// grpc-internal usage and is not intended to be imported directly by users. 22package transport // externally used as import "google.golang.org/grpc/transport" 23 24import ( 25 "errors" 26 "fmt" 27 "io" 28 "net" 29 "sync" 30 "sync/atomic" 31 32 "golang.org/x/net/context" 33 "google.golang.org/grpc/codes" 34 "google.golang.org/grpc/credentials" 35 "google.golang.org/grpc/keepalive" 36 "google.golang.org/grpc/metadata" 37 "google.golang.org/grpc/stats" 38 "google.golang.org/grpc/status" 39 "google.golang.org/grpc/tap" 40) 41 42// recvMsg represents the received msg from the transport. All transport 43// protocol specific info has been removed. 44type recvMsg struct { 45 data []byte 46 // nil: received some data 47 // io.EOF: stream is completed. data is nil. 48 // other non-nil error: transport failure. data is nil. 49 err error 50} 51 52// recvBuffer is an unbounded channel of recvMsg structs. 53// Note recvBuffer differs from controlBuffer only in that recvBuffer 54// holds a channel of only recvMsg structs instead of objects implementing "item" interface. 55// recvBuffer is written to much more often than 56// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put" 57type recvBuffer struct { 58 c chan recvMsg 59 mu sync.Mutex 60 backlog []recvMsg 61 err error 62} 63 64func newRecvBuffer() *recvBuffer { 65 b := &recvBuffer{ 66 c: make(chan recvMsg, 1), 67 } 68 return b 69} 70 71func (b *recvBuffer) put(r recvMsg) { 72 b.mu.Lock() 73 if b.err != nil { 74 b.mu.Unlock() 75 // An error had occurred earlier, don't accept more 76 // data or errors. 77 return 78 } 79 b.err = r.err 80 if len(b.backlog) == 0 { 81 select { 82 case b.c <- r: 83 b.mu.Unlock() 84 return 85 default: 86 } 87 } 88 b.backlog = append(b.backlog, r) 89 b.mu.Unlock() 90} 91 92func (b *recvBuffer) load() { 93 b.mu.Lock() 94 if len(b.backlog) > 0 { 95 select { 96 case b.c <- b.backlog[0]: 97 b.backlog[0] = recvMsg{} 98 b.backlog = b.backlog[1:] 99 default: 100 } 101 } 102 b.mu.Unlock() 103} 104 105// get returns the channel that receives a recvMsg in the buffer. 106// 107// Upon receipt of a recvMsg, the caller should call load to send another 108// recvMsg onto the channel if there is any. 109func (b *recvBuffer) get() <-chan recvMsg { 110 return b.c 111} 112 113// 114// recvBufferReader implements io.Reader interface to read the data from 115// recvBuffer. 116type recvBufferReader struct { 117 ctx context.Context 118 ctxDone <-chan struct{} // cache of ctx.Done() (for performance). 119 recv *recvBuffer 120 last []byte // Stores the remaining data in the previous calls. 121 err error 122} 123 124// Read reads the next len(p) bytes from last. If last is drained, it tries to 125// read additional data from recv. It blocks if there no additional data available 126// in recv. If Read returns any non-nil error, it will continue to return that error. 127func (r *recvBufferReader) Read(p []byte) (n int, err error) { 128 if r.err != nil { 129 return 0, r.err 130 } 131 n, r.err = r.read(p) 132 return n, r.err 133} 134 135func (r *recvBufferReader) read(p []byte) (n int, err error) { 136 if r.last != nil && len(r.last) > 0 { 137 // Read remaining data left in last call. 138 copied := copy(p, r.last) 139 r.last = r.last[copied:] 140 return copied, nil 141 } 142 select { 143 case <-r.ctxDone: 144 return 0, ContextErr(r.ctx.Err()) 145 case m := <-r.recv.get(): 146 r.recv.load() 147 if m.err != nil { 148 return 0, m.err 149 } 150 copied := copy(p, m.data) 151 r.last = m.data[copied:] 152 return copied, nil 153 } 154} 155 156type streamState uint32 157 158const ( 159 streamActive streamState = iota 160 streamWriteDone // EndStream sent 161 streamReadDone // EndStream received 162 streamDone // the entire stream is finished. 163) 164 165// Stream represents an RPC in the transport layer. 166type Stream struct { 167 id uint32 168 st ServerTransport // nil for client side Stream 169 ctx context.Context // the associated context of the stream 170 cancel context.CancelFunc // always nil for client side Stream 171 done chan struct{} // closed at the end of stream to unblock writers. On the client side. 172 ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance) 173 method string // the associated RPC method of the stream 174 recvCompress string 175 sendCompress string 176 buf *recvBuffer 177 trReader io.Reader 178 fc *inFlow 179 recvQuota uint32 180 wq *writeQuota 181 182 // Callback to state application's intentions to read data. This 183 // is used to adjust flow control, if needed. 184 requestRead func(int) 185 186 headerChan chan struct{} // closed to indicate the end of header metadata. 187 headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. 188 header metadata.MD // the received header metadata. 189 trailer metadata.MD // the key-value map of trailer metadata. 190 191 headerOk bool // becomes true from the first header is about to send 192 state streamState 193 194 status *status.Status // the status error received from the server 195 196 bytesReceived uint32 // indicates whether any bytes have been received on this stream 197 unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream 198 199 // contentSubtype is the content-subtype for requests. 200 // this must be lowercase or the behavior is undefined. 201 contentSubtype string 202} 203 204func (s *Stream) swapState(st streamState) streamState { 205 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st))) 206} 207 208func (s *Stream) compareAndSwapState(oldState, newState streamState) bool { 209 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState)) 210} 211 212func (s *Stream) getState() streamState { 213 return streamState(atomic.LoadUint32((*uint32)(&s.state))) 214} 215 216func (s *Stream) waitOnHeader() error { 217 if s.headerChan == nil { 218 // On the server headerChan is always nil since a stream originates 219 // only after having received headers. 220 return nil 221 } 222 select { 223 case <-s.ctx.Done(): 224 return ContextErr(s.ctx.Err()) 225 case <-s.headerChan: 226 return nil 227 } 228} 229 230// RecvCompress returns the compression algorithm applied to the inbound 231// message. It is empty string if there is no compression applied. 232func (s *Stream) RecvCompress() string { 233 if err := s.waitOnHeader(); err != nil { 234 return "" 235 } 236 return s.recvCompress 237} 238 239// SetSendCompress sets the compression algorithm to the stream. 240func (s *Stream) SetSendCompress(str string) { 241 s.sendCompress = str 242} 243 244// Done returns a chanel which is closed when it receives the final status 245// from the server. 246func (s *Stream) Done() <-chan struct{} { 247 return s.done 248} 249 250// Header acquires the key-value pairs of header metadata once it 251// is available. It blocks until i) the metadata is ready or ii) there is no 252// header metadata or iii) the stream is canceled/expired. 253func (s *Stream) Header() (metadata.MD, error) { 254 err := s.waitOnHeader() 255 // Even if the stream is closed, header is returned if available. 256 select { 257 case <-s.headerChan: 258 if s.header == nil { 259 return nil, nil 260 } 261 return s.header.Copy(), nil 262 default: 263 } 264 return nil, err 265} 266 267// Trailer returns the cached trailer metedata. Note that if it is not called 268// after the entire stream is done, it could return an empty MD. Client 269// side only. 270// It can be safely read only after stream has ended that is either read 271// or write have returned io.EOF. 272func (s *Stream) Trailer() metadata.MD { 273 c := s.trailer.Copy() 274 return c 275} 276 277// ServerTransport returns the underlying ServerTransport for the stream. 278// The client side stream always returns nil. 279func (s *Stream) ServerTransport() ServerTransport { 280 return s.st 281} 282 283// ContentSubtype returns the content-subtype for a request. For example, a 284// content-subtype of "proto" will result in a content-type of 285// "application/grpc+proto". This will always be lowercase. See 286// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for 287// more details. 288func (s *Stream) ContentSubtype() string { 289 return s.contentSubtype 290} 291 292// Context returns the context of the stream. 293func (s *Stream) Context() context.Context { 294 return s.ctx 295} 296 297// Method returns the method for the stream. 298func (s *Stream) Method() string { 299 return s.method 300} 301 302// Status returns the status received from the server. 303// Status can be read safely only after the stream has ended, 304// that is, read or write has returned io.EOF. 305func (s *Stream) Status() *status.Status { 306 return s.status 307} 308 309// SetHeader sets the header metadata. This can be called multiple times. 310// Server side only. 311// This should not be called in parallel to other data writes. 312func (s *Stream) SetHeader(md metadata.MD) error { 313 if md.Len() == 0 { 314 return nil 315 } 316 if s.headerOk || atomic.LoadUint32((*uint32)(&s.state)) == uint32(streamDone) { 317 return ErrIllegalHeaderWrite 318 } 319 s.header = metadata.Join(s.header, md) 320 return nil 321} 322 323// SendHeader sends the given header metadata. The given metadata is 324// combined with any metadata set by previous calls to SetHeader and 325// then written to the transport stream. 326func (s *Stream) SendHeader(md metadata.MD) error { 327 t := s.ServerTransport() 328 return t.WriteHeader(s, md) 329} 330 331// SetTrailer sets the trailer metadata which will be sent with the RPC status 332// by the server. This can be called multiple times. Server side only. 333// This should not be called parallel to other data writes. 334func (s *Stream) SetTrailer(md metadata.MD) error { 335 if md.Len() == 0 { 336 return nil 337 } 338 s.trailer = metadata.Join(s.trailer, md) 339 return nil 340} 341 342func (s *Stream) write(m recvMsg) { 343 s.buf.put(m) 344} 345 346// Read reads all p bytes from the wire for this stream. 347func (s *Stream) Read(p []byte) (n int, err error) { 348 // Don't request a read if there was an error earlier 349 if er := s.trReader.(*transportReader).er; er != nil { 350 return 0, er 351 } 352 s.requestRead(len(p)) 353 return io.ReadFull(s.trReader, p) 354} 355 356// tranportReader reads all the data available for this Stream from the transport and 357// passes them into the decoder, which converts them into a gRPC message stream. 358// The error is io.EOF when the stream is done or another non-nil error if 359// the stream broke. 360type transportReader struct { 361 reader io.Reader 362 // The handler to control the window update procedure for both this 363 // particular stream and the associated transport. 364 windowHandler func(int) 365 er error 366} 367 368func (t *transportReader) Read(p []byte) (n int, err error) { 369 n, err = t.reader.Read(p) 370 if err != nil { 371 t.er = err 372 return 373 } 374 t.windowHandler(n) 375 return 376} 377 378// BytesReceived indicates whether any bytes have been received on this stream. 379func (s *Stream) BytesReceived() bool { 380 return atomic.LoadUint32(&s.bytesReceived) == 1 381} 382 383// Unprocessed indicates whether the server did not process this stream -- 384// i.e. it sent a refused stream or GOAWAY including this stream ID. 385func (s *Stream) Unprocessed() bool { 386 return atomic.LoadUint32(&s.unprocessed) == 1 387} 388 389// GoString is implemented by Stream so context.String() won't 390// race when printing %#v. 391func (s *Stream) GoString() string { 392 return fmt.Sprintf("<stream: %p, %v>", s, s.method) 393} 394 395// state of transport 396type transportState int 397 398const ( 399 reachable transportState = iota 400 closing 401 draining 402) 403 404// ServerConfig consists of all the configurations to establish a server transport. 405type ServerConfig struct { 406 MaxStreams uint32 407 AuthInfo credentials.AuthInfo 408 InTapHandle tap.ServerInHandle 409 StatsHandler stats.Handler 410 KeepaliveParams keepalive.ServerParameters 411 KeepalivePolicy keepalive.EnforcementPolicy 412 InitialWindowSize int32 413 InitialConnWindowSize int32 414 WriteBufferSize int 415 ReadBufferSize int 416 ChannelzParentID int64 417} 418 419// NewServerTransport creates a ServerTransport with conn or non-nil error 420// if it fails. 421func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) { 422 return newHTTP2Server(conn, config) 423} 424 425// ConnectOptions covers all relevant options for communicating with the server. 426type ConnectOptions struct { 427 // UserAgent is the application user agent. 428 UserAgent string 429 // Authority is the :authority pseudo-header to use. This field has no effect if 430 // TransportCredentials is set. 431 Authority string 432 // Dialer specifies how to dial a network address. 433 Dialer func(context.Context, string) (net.Conn, error) 434 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors. 435 FailOnNonTempDialError bool 436 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs. 437 PerRPCCredentials []credentials.PerRPCCredentials 438 // TransportCredentials stores the Authenticator required to setup a client connection. 439 TransportCredentials credentials.TransportCredentials 440 // KeepaliveParams stores the keepalive parameters. 441 KeepaliveParams keepalive.ClientParameters 442 // StatsHandler stores the handler for stats. 443 StatsHandler stats.Handler 444 // InitialWindowSize sets the initial window size for a stream. 445 InitialWindowSize int32 446 // InitialConnWindowSize sets the initial window size for a connection. 447 InitialConnWindowSize int32 448 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire. 449 WriteBufferSize int 450 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall. 451 ReadBufferSize int 452 // ChannelzParentID sets the addrConn id which initiate the creation of this client transport. 453 ChannelzParentID int64 454} 455 456// TargetInfo contains the information of the target such as network address and metadata. 457type TargetInfo struct { 458 Addr string 459 Metadata interface{} 460 Authority string 461} 462 463// NewClientTransport establishes the transport with the required ConnectOptions 464// and returns it to the caller. 465func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) { 466 return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess) 467} 468 469// Options provides additional hints and information for message 470// transmission. 471type Options struct { 472 // Last indicates whether this write is the last piece for 473 // this stream. 474 Last bool 475 476 // Delay is a hint to the transport implementation for whether 477 // the data could be buffered for a batching write. The 478 // transport implementation may ignore the hint. 479 Delay bool 480} 481 482// CallHdr carries the information of a particular RPC. 483type CallHdr struct { 484 // Host specifies the peer's host. 485 Host string 486 487 // Method specifies the operation to perform. 488 Method string 489 490 // SendCompress specifies the compression algorithm applied on 491 // outbound message. 492 SendCompress string 493 494 // Creds specifies credentials.PerRPCCredentials for a call. 495 Creds credentials.PerRPCCredentials 496 497 // Flush indicates whether a new stream command should be sent 498 // to the peer without waiting for the first data. This is 499 // only a hint. 500 // If it's true, the transport may modify the flush decision 501 // for performance purposes. 502 // If it's false, new stream will never be flushed. 503 Flush bool 504 505 // ContentSubtype specifies the content-subtype for a request. For example, a 506 // content-subtype of "proto" will result in a content-type of 507 // "application/grpc+proto". The value of ContentSubtype must be all 508 // lowercase, otherwise the behavior is undefined. See 509 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests 510 // for more details. 511 ContentSubtype string 512} 513 514// ClientTransport is the common interface for all gRPC client-side transport 515// implementations. 516type ClientTransport interface { 517 // Close tears down this transport. Once it returns, the transport 518 // should not be accessed any more. The caller must make sure this 519 // is called only once. 520 Close() error 521 522 // GracefulClose starts to tear down the transport. It stops accepting 523 // new RPCs and wait the completion of the pending RPCs. 524 GracefulClose() error 525 526 // Write sends the data for the given stream. A nil stream indicates 527 // the write is to be performed on the transport as a whole. 528 Write(s *Stream, hdr []byte, data []byte, opts *Options) error 529 530 // NewStream creates a Stream for an RPC. 531 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) 532 533 // CloseStream clears the footprint of a stream when the stream is 534 // not needed any more. The err indicates the error incurred when 535 // CloseStream is called. Must be called when a stream is finished 536 // unless the associated transport is closing. 537 CloseStream(stream *Stream, err error) 538 539 // Error returns a channel that is closed when some I/O error 540 // happens. Typically the caller should have a goroutine to monitor 541 // this in order to take action (e.g., close the current transport 542 // and create a new one) in error case. It should not return nil 543 // once the transport is initiated. 544 Error() <-chan struct{} 545 546 // GoAway returns a channel that is closed when ClientTransport 547 // receives the draining signal from the server (e.g., GOAWAY frame in 548 // HTTP/2). 549 GoAway() <-chan struct{} 550 551 // GetGoAwayReason returns the reason why GoAway frame was received. 552 GetGoAwayReason() GoAwayReason 553 554 // IncrMsgSent increments the number of message sent through this transport. 555 IncrMsgSent() 556 557 // IncrMsgRecv increments the number of message received through this transport. 558 IncrMsgRecv() 559} 560 561// ServerTransport is the common interface for all gRPC server-side transport 562// implementations. 563// 564// Methods may be called concurrently from multiple goroutines, but 565// Write methods for a given Stream will be called serially. 566type ServerTransport interface { 567 // HandleStreams receives incoming streams using the given handler. 568 HandleStreams(func(*Stream), func(context.Context, string) context.Context) 569 570 // WriteHeader sends the header metadata for the given stream. 571 // WriteHeader may not be called on all streams. 572 WriteHeader(s *Stream, md metadata.MD) error 573 574 // Write sends the data for the given stream. 575 // Write may not be called on all streams. 576 Write(s *Stream, hdr []byte, data []byte, opts *Options) error 577 578 // WriteStatus sends the status of a stream to the client. WriteStatus is 579 // the final call made on a stream and always occurs. 580 WriteStatus(s *Stream, st *status.Status) error 581 582 // Close tears down the transport. Once it is called, the transport 583 // should not be accessed any more. All the pending streams and their 584 // handlers will be terminated asynchronously. 585 Close() error 586 587 // RemoteAddr returns the remote network address. 588 RemoteAddr() net.Addr 589 590 // Drain notifies the client this ServerTransport stops accepting new RPCs. 591 Drain() 592 593 // IncrMsgSent increments the number of message sent through this transport. 594 IncrMsgSent() 595 596 // IncrMsgRecv increments the number of message received through this transport. 597 IncrMsgRecv() 598} 599 600// streamErrorf creates an StreamError with the specified error code and description. 601func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError { 602 return StreamError{ 603 Code: c, 604 Desc: fmt.Sprintf(format, a...), 605 } 606} 607 608// connectionErrorf creates an ConnectionError with the specified error description. 609func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError { 610 return ConnectionError{ 611 Desc: fmt.Sprintf(format, a...), 612 temp: temp, 613 err: e, 614 } 615} 616 617// ConnectionError is an error that results in the termination of the 618// entire connection and the retry of all the active streams. 619type ConnectionError struct { 620 Desc string 621 temp bool 622 err error 623} 624 625func (e ConnectionError) Error() string { 626 return fmt.Sprintf("connection error: desc = %q", e.Desc) 627} 628 629// Temporary indicates if this connection error is temporary or fatal. 630func (e ConnectionError) Temporary() bool { 631 return e.temp 632} 633 634// Origin returns the original error of this connection error. 635func (e ConnectionError) Origin() error { 636 // Never return nil error here. 637 // If the original error is nil, return itself. 638 if e.err == nil { 639 return e 640 } 641 return e.err 642} 643 644var ( 645 // ErrConnClosing indicates that the transport is closing. 646 ErrConnClosing = connectionErrorf(true, nil, "transport is closing") 647 // errStreamDrain indicates that the stream is rejected because the 648 // connection is draining. This could be caused by goaway or balancer 649 // removing the address. 650 errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining") 651 // errStreamDone is returned from write at the client side to indiacte application 652 // layer of an error. 653 errStreamDone = errors.New("the stream is done") 654 // StatusGoAway indicates that the server sent a GOAWAY that included this 655 // stream's ID in unprocessed RPCs. 656 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection") 657) 658 659// TODO: See if we can replace StreamError with status package errors. 660 661// StreamError is an error that only affects one stream within a connection. 662type StreamError struct { 663 Code codes.Code 664 Desc string 665} 666 667func (e StreamError) Error() string { 668 return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc) 669} 670 671// GoAwayReason contains the reason for the GoAway frame received. 672type GoAwayReason uint8 673 674const ( 675 // GoAwayInvalid indicates that no GoAway frame is received. 676 GoAwayInvalid GoAwayReason = 0 677 // GoAwayNoReason is the default value when GoAway frame is received. 678 GoAwayNoReason GoAwayReason = 1 679 // GoAwayTooManyPings indicates that a GoAway frame with 680 // ErrCodeEnhanceYourCalm was received and that the debug data said 681 // "too_many_pings". 682 GoAwayTooManyPings GoAwayReason = 2 683) 684