1/* 2 * 3 * Copyright 2014, Google Inc. 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are 8 * met: 9 * 10 * * Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * * Redistributions in binary form must reproduce the above 13 * copyright notice, this list of conditions and the following disclaimer 14 * in the documentation and/or other materials provided with the 15 * distribution. 16 * * Neither the name of Google Inc. nor the names of its 17 * contributors may be used to endorse or promote products derived from 18 * this software without specific prior written permission. 19 * 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 * 32 */ 33 34package grpc 35 36import ( 37 "bytes" 38 "compress/gzip" 39 "encoding/binary" 40 "fmt" 41 "io" 42 "io/ioutil" 43 "math" 44 "os" 45 "time" 46 47 "github.com/golang/protobuf/proto" 48 "golang.org/x/net/context" 49 "google.golang.org/grpc/codes" 50 "google.golang.org/grpc/metadata" 51 "google.golang.org/grpc/stats" 52 "google.golang.org/grpc/transport" 53) 54 55// Codec defines the interface gRPC uses to encode and decode messages. 56type Codec interface { 57 // Marshal returns the wire format of v. 58 Marshal(v interface{}) ([]byte, error) 59 // Unmarshal parses the wire format into v. 60 Unmarshal(data []byte, v interface{}) error 61 // String returns the name of the Codec implementation. The returned 62 // string will be used as part of content type in transmission. 63 String() string 64} 65 66// protoCodec is a Codec implementation with protobuf. It is the default codec for gRPC. 67type protoCodec struct{} 68 69func (protoCodec) Marshal(v interface{}) ([]byte, error) { 70 return proto.Marshal(v.(proto.Message)) 71} 72 73func (protoCodec) Unmarshal(data []byte, v interface{}) error { 74 return proto.Unmarshal(data, v.(proto.Message)) 75} 76 77func (protoCodec) String() string { 78 return "proto" 79} 80 81// Compressor defines the interface gRPC uses to compress a message. 82type Compressor interface { 83 // Do compresses p into w. 84 Do(w io.Writer, p []byte) error 85 // Type returns the compression algorithm the Compressor uses. 86 Type() string 87} 88 89// NewGZIPCompressor creates a Compressor based on GZIP. 90func NewGZIPCompressor() Compressor { 91 return &gzipCompressor{} 92} 93 94type gzipCompressor struct { 95} 96 97func (c *gzipCompressor) Do(w io.Writer, p []byte) error { 98 z := gzip.NewWriter(w) 99 if _, err := z.Write(p); err != nil { 100 return err 101 } 102 return z.Close() 103} 104 105func (c *gzipCompressor) Type() string { 106 return "gzip" 107} 108 109// Decompressor defines the interface gRPC uses to decompress a message. 110type Decompressor interface { 111 // Do reads the data from r and uncompress them. 112 Do(r io.Reader) ([]byte, error) 113 // Type returns the compression algorithm the Decompressor uses. 114 Type() string 115} 116 117type gzipDecompressor struct { 118} 119 120// NewGZIPDecompressor creates a Decompressor based on GZIP. 121func NewGZIPDecompressor() Decompressor { 122 return &gzipDecompressor{} 123} 124 125func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) { 126 z, err := gzip.NewReader(r) 127 if err != nil { 128 return nil, err 129 } 130 defer z.Close() 131 return ioutil.ReadAll(z) 132} 133 134func (d *gzipDecompressor) Type() string { 135 return "gzip" 136} 137 138// callInfo contains all related configuration and information about an RPC. 139type callInfo struct { 140 failFast bool 141 headerMD metadata.MD 142 trailerMD metadata.MD 143 traceInfo traceInfo // in trace.go 144} 145 146var defaultCallInfo = callInfo{failFast: true} 147 148// CallOption configures a Call before it starts or extracts information from 149// a Call after it completes. 150type CallOption interface { 151 // before is called before the call is sent to any server. If before 152 // returns a non-nil error, the RPC fails with that error. 153 before(*callInfo) error 154 155 // after is called after the call has completed. after cannot return an 156 // error, so any failures should be reported via output parameters. 157 after(*callInfo) 158} 159 160type beforeCall func(c *callInfo) error 161 162func (o beforeCall) before(c *callInfo) error { return o(c) } 163func (o beforeCall) after(c *callInfo) {} 164 165type afterCall func(c *callInfo) 166 167func (o afterCall) before(c *callInfo) error { return nil } 168func (o afterCall) after(c *callInfo) { o(c) } 169 170// Header returns a CallOptions that retrieves the header metadata 171// for a unary RPC. 172func Header(md *metadata.MD) CallOption { 173 return afterCall(func(c *callInfo) { 174 *md = c.headerMD 175 }) 176} 177 178// Trailer returns a CallOptions that retrieves the trailer metadata 179// for a unary RPC. 180func Trailer(md *metadata.MD) CallOption { 181 return afterCall(func(c *callInfo) { 182 *md = c.trailerMD 183 }) 184} 185 186// FailFast configures the action to take when an RPC is attempted on broken 187// connections or unreachable servers. If failfast is true, the RPC will fail 188// immediately. Otherwise, the RPC client will block the call until a 189// connection is available (or the call is canceled or times out) and will retry 190// the call if it fails due to a transient error. Please refer to 191// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md 192func FailFast(failFast bool) CallOption { 193 return beforeCall(func(c *callInfo) error { 194 c.failFast = failFast 195 return nil 196 }) 197} 198 199// The format of the payload: compressed or not? 200type payloadFormat uint8 201 202const ( 203 compressionNone payloadFormat = iota // no compression 204 compressionMade 205) 206 207// parser reads complete gRPC messages from the underlying reader. 208type parser struct { 209 // r is the underlying reader. 210 // See the comment on recvMsg for the permissible 211 // error types. 212 r io.Reader 213 214 // The header of a gRPC message. Find more detail 215 // at http://www.grpc.io/docs/guides/wire.html. 216 header [5]byte 217} 218 219// recvMsg reads a complete gRPC message from the stream. 220// 221// It returns the message and its payload (compression/encoding) 222// format. The caller owns the returned msg memory. 223// 224// If there is an error, possible values are: 225// * io.EOF, when no messages remain 226// * io.ErrUnexpectedEOF 227// * of type transport.ConnectionError 228// * of type transport.StreamError 229// No other error values or types must be returned, which also means 230// that the underlying io.Reader must not return an incompatible 231// error. 232func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err error) { 233 if _, err := io.ReadFull(p.r, p.header[:]); err != nil { 234 return 0, nil, err 235 } 236 237 pf = payloadFormat(p.header[0]) 238 length := binary.BigEndian.Uint32(p.header[1:]) 239 240 if length == 0 { 241 return pf, nil, nil 242 } 243 if length > uint32(maxMsgSize) { 244 return 0, nil, Errorf(codes.Internal, "grpc: received message length %d exceeding the max size %d", length, maxMsgSize) 245 } 246 // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead 247 // of making it for each message: 248 msg = make([]byte, int(length)) 249 if _, err := io.ReadFull(p.r, msg); err != nil { 250 if err == io.EOF { 251 err = io.ErrUnexpectedEOF 252 } 253 return 0, nil, err 254 } 255 return pf, msg, nil 256} 257 258// encode serializes msg and prepends the message header. If msg is nil, it 259// generates the message header of 0 message length. 260func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, error) { 261 var ( 262 b []byte 263 length uint 264 ) 265 if msg != nil { 266 var err error 267 // TODO(zhaoq): optimize to reduce memory alloc and copying. 268 b, err = c.Marshal(msg) 269 if err != nil { 270 return nil, err 271 } 272 if outPayload != nil { 273 outPayload.Payload = msg 274 // TODO truncate large payload. 275 outPayload.Data = b 276 outPayload.Length = len(b) 277 } 278 if cp != nil { 279 if err := cp.Do(cbuf, b); err != nil { 280 return nil, err 281 } 282 b = cbuf.Bytes() 283 } 284 length = uint(len(b)) 285 } 286 if length > math.MaxUint32 { 287 return nil, Errorf(codes.InvalidArgument, "grpc: message too large (%d bytes)", length) 288 } 289 290 const ( 291 payloadLen = 1 292 sizeLen = 4 293 ) 294 295 var buf = make([]byte, payloadLen+sizeLen+len(b)) 296 297 // Write payload format 298 if cp == nil { 299 buf[0] = byte(compressionNone) 300 } else { 301 buf[0] = byte(compressionMade) 302 } 303 // Write length of b into buf 304 binary.BigEndian.PutUint32(buf[1:], uint32(length)) 305 // Copy encoded msg to buf 306 copy(buf[5:], b) 307 308 if outPayload != nil { 309 outPayload.WireLength = len(buf) 310 } 311 312 return buf, nil 313} 314 315func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) error { 316 switch pf { 317 case compressionNone: 318 case compressionMade: 319 if dc == nil || recvCompress != dc.Type() { 320 return Errorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress) 321 } 322 default: 323 return Errorf(codes.Internal, "grpc: received unexpected payload format %d", pf) 324 } 325 return nil 326} 327 328func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error { 329 pf, d, err := p.recvMsg(maxMsgSize) 330 if err != nil { 331 return err 332 } 333 if inPayload != nil { 334 inPayload.WireLength = len(d) 335 } 336 if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil { 337 return err 338 } 339 if pf == compressionMade { 340 d, err = dc.Do(bytes.NewReader(d)) 341 if err != nil { 342 return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) 343 } 344 } 345 if len(d) > maxMsgSize { 346 // TODO: Revisit the error code. Currently keep it consistent with java 347 // implementation. 348 return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize) 349 } 350 if err := c.Unmarshal(d, m); err != nil { 351 return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) 352 } 353 if inPayload != nil { 354 inPayload.RecvTime = time.Now() 355 inPayload.Payload = m 356 // TODO truncate large payload. 357 inPayload.Data = d 358 inPayload.Length = len(d) 359 } 360 return nil 361} 362 363// rpcError defines the status from an RPC. 364type rpcError struct { 365 code codes.Code 366 desc string 367} 368 369func (e *rpcError) Error() string { 370 return fmt.Sprintf("rpc error: code = %d desc = %s", e.code, e.desc) 371} 372 373// Code returns the error code for err if it was produced by the rpc system. 374// Otherwise, it returns codes.Unknown. 375func Code(err error) codes.Code { 376 if err == nil { 377 return codes.OK 378 } 379 if e, ok := err.(*rpcError); ok { 380 return e.code 381 } 382 return codes.Unknown 383} 384 385// ErrorDesc returns the error description of err if it was produced by the rpc system. 386// Otherwise, it returns err.Error() or empty string when err is nil. 387func ErrorDesc(err error) string { 388 if err == nil { 389 return "" 390 } 391 if e, ok := err.(*rpcError); ok { 392 return e.desc 393 } 394 return err.Error() 395} 396 397// Errorf returns an error containing an error code and a description; 398// Errorf returns nil if c is OK. 399func Errorf(c codes.Code, format string, a ...interface{}) error { 400 if c == codes.OK { 401 return nil 402 } 403 return &rpcError{ 404 code: c, 405 desc: fmt.Sprintf(format, a...), 406 } 407} 408 409// toRPCErr converts an error into a rpcError. 410func toRPCErr(err error) error { 411 switch e := err.(type) { 412 case *rpcError: 413 return err 414 case transport.StreamError: 415 return &rpcError{ 416 code: e.Code, 417 desc: e.Desc, 418 } 419 case transport.ConnectionError: 420 return &rpcError{ 421 code: codes.Internal, 422 desc: e.Desc, 423 } 424 default: 425 switch err { 426 case context.DeadlineExceeded: 427 return &rpcError{ 428 code: codes.DeadlineExceeded, 429 desc: err.Error(), 430 } 431 case context.Canceled: 432 return &rpcError{ 433 code: codes.Canceled, 434 desc: err.Error(), 435 } 436 case ErrClientConnClosing: 437 return &rpcError{ 438 code: codes.FailedPrecondition, 439 desc: err.Error(), 440 } 441 } 442 443 } 444 return Errorf(codes.Unknown, "%v", err) 445} 446 447// convertCode converts a standard Go error into its canonical code. Note that 448// this is only used to translate the error returned by the server applications. 449func convertCode(err error) codes.Code { 450 switch err { 451 case nil: 452 return codes.OK 453 case io.EOF: 454 return codes.OutOfRange 455 case io.ErrClosedPipe, io.ErrNoProgress, io.ErrShortBuffer, io.ErrShortWrite, io.ErrUnexpectedEOF: 456 return codes.FailedPrecondition 457 case os.ErrInvalid: 458 return codes.InvalidArgument 459 case context.Canceled: 460 return codes.Canceled 461 case context.DeadlineExceeded: 462 return codes.DeadlineExceeded 463 } 464 switch { 465 case os.IsExist(err): 466 return codes.AlreadyExists 467 case os.IsNotExist(err): 468 return codes.NotFound 469 case os.IsPermission(err): 470 return codes.PermissionDenied 471 } 472 return codes.Unknown 473} 474 475// MethodConfig defines the configuration recommended by the service providers for a 476// particular method. 477// This is EXPERIMENTAL and subject to change. 478type MethodConfig struct { 479 // WaitForReady indicates whether RPCs sent to this method should wait until 480 // the connection is ready by default (!failfast). The value specified via the 481 // gRPC client API will override the value set here. 482 WaitForReady bool 483 // Timeout is the default timeout for RPCs sent to this method. The actual 484 // deadline used will be the minimum of the value specified here and the value 485 // set by the application via the gRPC client API. If either one is not set, 486 // then the other will be used. If neither is set, then the RPC has no deadline. 487 Timeout time.Duration 488 // MaxReqSize is the maximum allowed payload size for an individual request in a 489 // stream (client->server) in bytes. The size which is measured is the serialized, 490 // uncompressed payload in bytes. The actual value used is the minumum of the value 491 // specified here and the value set by the application via the gRPC client API. If 492 // either one is not set, then the other will be used. If neither is set, then the 493 // built-in default is used. 494 // TODO: support this. 495 MaxReqSize uint64 496 // MaxRespSize is the maximum allowed payload size for an individual response in a 497 // stream (server->client) in bytes. 498 // TODO: support this. 499 MaxRespSize uint64 500} 501 502// ServiceConfig is provided by the service provider and contains parameters for how 503// clients that connect to the service should behave. 504// This is EXPERIMENTAL and subject to change. 505type ServiceConfig struct { 506 // LB is the load balancer the service providers recommends. The balancer specified 507 // via grpc.WithBalancer will override this. 508 LB Balancer 509 // Methods contains a map for the methods in this service. 510 Methods map[string]MethodConfig 511} 512 513// SupportPackageIsVersion4 is referenced from generated protocol buffer files 514// to assert that that code is compatible with this version of the grpc package. 515// 516// This constant may be renamed in the future if a change in the generated code 517// requires a synchronised update of grpc-go and protoc-gen-go. This constant 518// should not be referenced from any other code. 519const SupportPackageIsVersion4 = true 520