1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "errors" 23 "io" 24 "math" 25 "strconv" 26 "sync" 27 "time" 28 29 "golang.org/x/net/context" 30 "golang.org/x/net/trace" 31 "google.golang.org/grpc/balancer" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/encoding" 34 "google.golang.org/grpc/grpclog" 35 "google.golang.org/grpc/internal/channelz" 36 "google.golang.org/grpc/internal/grpcrand" 37 "google.golang.org/grpc/internal/transport" 38 "google.golang.org/grpc/metadata" 39 "google.golang.org/grpc/stats" 40 "google.golang.org/grpc/status" 41) 42 43// StreamHandler defines the handler called by gRPC server to complete the 44// execution of a streaming RPC. If a StreamHandler returns an error, it 45// should be produced by the status package, or else gRPC will use 46// codes.Unknown as the status code and err.Error() as the status message 47// of the RPC. 48type StreamHandler func(srv interface{}, stream ServerStream) error 49 50// StreamDesc represents a streaming RPC service's method specification. 51type StreamDesc struct { 52 StreamName string 53 Handler StreamHandler 54 55 // At least one of these is true. 56 ServerStreams bool 57 ClientStreams bool 58} 59 60// Stream defines the common interface a client or server stream has to satisfy. 61// 62// Deprecated: See ClientStream and ServerStream documentation instead. 63type Stream interface { 64 // Deprecated: See ClientStream and ServerStream documentation instead. 65 Context() context.Context 66 // Deprecated: See ClientStream and ServerStream documentation instead. 67 SendMsg(m interface{}) error 68 // Deprecated: See ClientStream and ServerStream documentation instead. 69 RecvMsg(m interface{}) error 70} 71 72// ClientStream defines the client-side behavior of a streaming RPC. 73// 74// All errors returned from ClientStream methods are compatible with the 75// status package. 76type ClientStream interface { 77 // Header returns the header metadata received from the server if there 78 // is any. It blocks if the metadata is not ready to read. 79 Header() (metadata.MD, error) 80 // Trailer returns the trailer metadata from the server, if there is any. 81 // It must only be called after stream.CloseAndRecv has returned, or 82 // stream.Recv has returned a non-nil error (including io.EOF). 83 Trailer() metadata.MD 84 // CloseSend closes the send direction of the stream. It closes the stream 85 // when non-nil error is met. 86 CloseSend() error 87 // Context returns the context for this stream. 88 // 89 // It should not be called until after Header or RecvMsg has returned. Once 90 // called, subsequent client-side retries are disabled. 91 Context() context.Context 92 // SendMsg is generally called by generated code. On error, SendMsg aborts 93 // the stream. If the error was generated by the client, the status is 94 // returned directly; otherwise, io.EOF is returned and the status of 95 // the stream may be discovered using RecvMsg. 96 // 97 // SendMsg blocks until: 98 // - There is sufficient flow control to schedule m with the transport, or 99 // - The stream is done, or 100 // - The stream breaks. 101 // 102 // SendMsg does not wait until the message is received by the server. An 103 // untimely stream closure may result in lost messages. To ensure delivery, 104 // users should ensure the RPC completed successfully using RecvMsg. 105 // 106 // It is safe to have a goroutine calling SendMsg and another goroutine 107 // calling RecvMsg on the same stream at the same time, but it is not safe 108 // to call SendMsg on the same stream in different goroutines. 109 SendMsg(m interface{}) error 110 // RecvMsg blocks until it receives a message into m or the stream is 111 // done. It returns io.EOF when the stream completes successfully. On 112 // any other error, the stream is aborted and the error contains the RPC 113 // status. 114 // 115 // It is safe to have a goroutine calling SendMsg and another goroutine 116 // calling RecvMsg on the same stream at the same time, but it is not 117 // safe to call RecvMsg on the same stream in different goroutines. 118 RecvMsg(m interface{}) error 119} 120 121// NewStream creates a new Stream for the client side. This is typically 122// called by generated code. ctx is used for the lifetime of the stream. 123// 124// To ensure resources are not leaked due to the stream returned, one of the following 125// actions must be performed: 126// 127// 1. Call Close on the ClientConn. 128// 2. Cancel the context provided. 129// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated 130// client-streaming RPC, for instance, might use the helper function 131// CloseAndRecv (note that CloseSend does not Recv, therefore is not 132// guaranteed to release all resources). 133// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg. 134// 135// If none of the above happen, a goroutine and a context will be leaked, and grpc 136// will not call the optionally-configured stats handler with a stats.End message. 137func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { 138 // allow interceptor to see all applicable call options, which means those 139 // configured as defaults from dial option as well as per-call options 140 opts = combine(cc.dopts.callOptions, opts) 141 142 if cc.dopts.streamInt != nil { 143 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) 144 } 145 return newClientStream(ctx, desc, cc, method, opts...) 146} 147 148// NewClientStream is a wrapper for ClientConn.NewStream. 149func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 150 return cc.NewStream(ctx, desc, method, opts...) 151} 152 153func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { 154 if channelz.IsOn() { 155 cc.incrCallsStarted() 156 defer func() { 157 if err != nil { 158 cc.incrCallsFailed() 159 } 160 }() 161 } 162 c := defaultCallInfo() 163 mc := cc.GetMethodConfig(method) 164 if mc.WaitForReady != nil { 165 c.failFast = !*mc.WaitForReady 166 } 167 168 // Possible context leak: 169 // The cancel function for the child context we create will only be called 170 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if 171 // an error is generated by SendMsg. 172 // https://github.com/grpc/grpc-go/issues/1818. 173 var cancel context.CancelFunc 174 if mc.Timeout != nil && *mc.Timeout >= 0 { 175 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) 176 } else { 177 ctx, cancel = context.WithCancel(ctx) 178 } 179 defer func() { 180 if err != nil { 181 cancel() 182 } 183 }() 184 185 for _, o := range opts { 186 if err := o.before(c); err != nil { 187 return nil, toRPCErr(err) 188 } 189 } 190 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) 191 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) 192 if err := setCallInfoCodec(c); err != nil { 193 return nil, err 194 } 195 196 callHdr := &transport.CallHdr{ 197 Host: cc.authority, 198 Method: method, 199 ContentSubtype: c.contentSubtype, 200 } 201 202 // Set our outgoing compression according to the UseCompressor CallOption, if 203 // set. In that case, also find the compressor from the encoding package. 204 // Otherwise, use the compressor configured by the WithCompressor DialOption, 205 // if set. 206 var cp Compressor 207 var comp encoding.Compressor 208 if ct := c.compressorType; ct != "" { 209 callHdr.SendCompress = ct 210 if ct != encoding.Identity { 211 comp = encoding.GetCompressor(ct) 212 if comp == nil { 213 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) 214 } 215 } 216 } else if cc.dopts.cp != nil { 217 callHdr.SendCompress = cc.dopts.cp.Type() 218 cp = cc.dopts.cp 219 } 220 if c.creds != nil { 221 callHdr.Creds = c.creds 222 } 223 var trInfo traceInfo 224 if EnableTracing { 225 trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) 226 trInfo.firstLine.client = true 227 if deadline, ok := ctx.Deadline(); ok { 228 trInfo.firstLine.deadline = deadline.Sub(time.Now()) 229 } 230 trInfo.tr.LazyLog(&trInfo.firstLine, false) 231 ctx = trace.NewContext(ctx, trInfo.tr) 232 } 233 ctx = newContextWithRPCInfo(ctx, c.failFast) 234 sh := cc.dopts.copts.StatsHandler 235 var beginTime time.Time 236 if sh != nil { 237 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) 238 beginTime = time.Now() 239 begin := &stats.Begin{ 240 Client: true, 241 BeginTime: beginTime, 242 FailFast: c.failFast, 243 } 244 sh.HandleRPC(ctx, begin) 245 } 246 247 cs := &clientStream{ 248 callHdr: callHdr, 249 ctx: ctx, 250 methodConfig: &mc, 251 opts: opts, 252 callInfo: c, 253 cc: cc, 254 desc: desc, 255 codec: c.codec, 256 cp: cp, 257 comp: comp, 258 cancel: cancel, 259 beginTime: beginTime, 260 firstAttempt: true, 261 } 262 if !cc.dopts.disableRetry { 263 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler) 264 } 265 266 cs.callInfo.stream = cs 267 // Only this initial attempt has stats/tracing. 268 // TODO(dfawley): move to newAttempt when per-attempt stats are implemented. 269 if err := cs.newAttemptLocked(sh, trInfo); err != nil { 270 cs.finish(err) 271 return nil, err 272 } 273 274 op := func(a *csAttempt) error { return a.newStream() } 275 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil { 276 cs.finish(err) 277 return nil, err 278 } 279 280 if desc != unaryStreamDesc { 281 // Listen on cc and stream contexts to cleanup when the user closes the 282 // ClientConn or cancels the stream context. In all other cases, an error 283 // should already be injected into the recv buffer by the transport, which 284 // the client will eventually receive, and then we will cancel the stream's 285 // context in clientStream.finish. 286 go func() { 287 select { 288 case <-cc.ctx.Done(): 289 cs.finish(ErrClientConnClosing) 290 case <-ctx.Done(): 291 cs.finish(toRPCErr(ctx.Err())) 292 } 293 }() 294 } 295 return cs, nil 296} 297 298func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error { 299 cs.attempt = &csAttempt{ 300 cs: cs, 301 dc: cs.cc.dopts.dc, 302 statsHandler: sh, 303 trInfo: trInfo, 304 } 305 306 if err := cs.ctx.Err(); err != nil { 307 return toRPCErr(err) 308 } 309 t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method) 310 if err != nil { 311 return err 312 } 313 cs.attempt.t = t 314 cs.attempt.done = done 315 return nil 316} 317 318func (a *csAttempt) newStream() error { 319 cs := a.cs 320 cs.callHdr.PreviousAttempts = cs.numRetries 321 s, err := a.t.NewStream(cs.ctx, cs.callHdr) 322 if err != nil { 323 return toRPCErr(err) 324 } 325 cs.attempt.s = s 326 cs.attempt.p = &parser{r: s} 327 return nil 328} 329 330// clientStream implements a client side Stream. 331type clientStream struct { 332 callHdr *transport.CallHdr 333 opts []CallOption 334 callInfo *callInfo 335 cc *ClientConn 336 desc *StreamDesc 337 338 codec baseCodec 339 cp Compressor 340 comp encoding.Compressor 341 342 cancel context.CancelFunc // cancels all attempts 343 344 sentLast bool // sent an end stream 345 beginTime time.Time 346 347 methodConfig *MethodConfig 348 349 ctx context.Context // the application's context, wrapped by stats/tracing 350 351 retryThrottler *retryThrottler // The throttler active when the RPC began. 352 353 mu sync.Mutex 354 firstAttempt bool // if true, transparent retry is valid 355 numRetries int // exclusive of transparent retry attempt(s) 356 numRetriesSincePushback int // retries since pushback; to reset backoff 357 finished bool // TODO: replace with atomic cmpxchg or sync.Once? 358 attempt *csAttempt // the active client stream attempt 359 // TODO(hedging): hedging will have multiple attempts simultaneously. 360 committed bool // active attempt committed for retry? 361 buffer []func(a *csAttempt) error // operations to replay on retry 362 bufferSize int // current size of buffer 363} 364 365// csAttempt implements a single transport stream attempt within a 366// clientStream. 367type csAttempt struct { 368 cs *clientStream 369 t transport.ClientTransport 370 s *transport.Stream 371 p *parser 372 done func(balancer.DoneInfo) 373 374 finished bool 375 dc Decompressor 376 decomp encoding.Compressor 377 decompSet bool 378 379 mu sync.Mutex // guards trInfo.tr 380 // trInfo.tr is set when created (if EnableTracing is true), 381 // and cleared when the finish method is called. 382 trInfo traceInfo 383 384 statsHandler stats.Handler 385} 386 387func (cs *clientStream) commitAttemptLocked() { 388 cs.committed = true 389 cs.buffer = nil 390} 391 392func (cs *clientStream) commitAttempt() { 393 cs.mu.Lock() 394 cs.commitAttemptLocked() 395 cs.mu.Unlock() 396} 397 398// shouldRetry returns nil if the RPC should be retried; otherwise it returns 399// the error that should be returned by the operation. 400func (cs *clientStream) shouldRetry(err error) error { 401 if cs.attempt.s == nil && !cs.callInfo.failFast { 402 // In the event of any error from NewStream (attempt.s == nil), we 403 // never attempted to write anything to the wire, so we can retry 404 // indefinitely for non-fail-fast RPCs. 405 return nil 406 } 407 if cs.finished || cs.committed { 408 // RPC is finished or committed; cannot retry. 409 return err 410 } 411 // Wait for the trailers. 412 if cs.attempt.s != nil { 413 <-cs.attempt.s.Done() 414 } 415 if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) { 416 // First attempt, wait-for-ready, stream unprocessed: transparently retry. 417 cs.firstAttempt = false 418 return nil 419 } 420 cs.firstAttempt = false 421 if cs.cc.dopts.disableRetry { 422 return err 423 } 424 425 pushback := 0 426 hasPushback := false 427 if cs.attempt.s != nil { 428 if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil { 429 // Context error; stop now. 430 return toErr 431 } else if !to { 432 return err 433 } 434 435 // TODO(retry): Move down if the spec changes to not check server pushback 436 // before considering this a failure for throttling. 437 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"] 438 if len(sps) == 1 { 439 var e error 440 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 { 441 grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0]) 442 cs.retryThrottler.throttle() // This counts as a failure for throttling. 443 return err 444 } 445 hasPushback = true 446 } else if len(sps) > 1 { 447 grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps) 448 cs.retryThrottler.throttle() // This counts as a failure for throttling. 449 return err 450 } 451 } 452 453 var code codes.Code 454 if cs.attempt.s != nil { 455 code = cs.attempt.s.Status().Code() 456 } else { 457 code = status.Convert(err).Code() 458 } 459 460 rp := cs.methodConfig.retryPolicy 461 if rp == nil || !rp.retryableStatusCodes[code] { 462 return err 463 } 464 465 // Note: the ordering here is important; we count this as a failure 466 // only if the code matched a retryable code. 467 if cs.retryThrottler.throttle() { 468 return err 469 } 470 if cs.numRetries+1 >= rp.maxAttempts { 471 return err 472 } 473 474 var dur time.Duration 475 if hasPushback { 476 dur = time.Millisecond * time.Duration(pushback) 477 cs.numRetriesSincePushback = 0 478 } else { 479 fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback)) 480 cur := float64(rp.initialBackoff) * fact 481 if max := float64(rp.maxBackoff); cur > max { 482 cur = max 483 } 484 dur = time.Duration(grpcrand.Int63n(int64(cur))) 485 cs.numRetriesSincePushback++ 486 } 487 488 // TODO(dfawley): we could eagerly fail here if dur puts us past the 489 // deadline, but unsure if it is worth doing. 490 t := time.NewTimer(dur) 491 select { 492 case <-t.C: 493 cs.numRetries++ 494 return nil 495 case <-cs.ctx.Done(): 496 t.Stop() 497 return status.FromContextError(cs.ctx.Err()).Err() 498 } 499} 500 501// Returns nil if a retry was performed and succeeded; error otherwise. 502func (cs *clientStream) retryLocked(lastErr error) error { 503 for { 504 cs.attempt.finish(lastErr) 505 if err := cs.shouldRetry(lastErr); err != nil { 506 cs.commitAttemptLocked() 507 return err 508 } 509 if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil { 510 return err 511 } 512 if lastErr = cs.replayBufferLocked(); lastErr == nil { 513 return nil 514 } 515 } 516} 517 518func (cs *clientStream) Context() context.Context { 519 cs.commitAttempt() 520 // No need to lock before using attempt, since we know it is committed and 521 // cannot change. 522 return cs.attempt.s.Context() 523} 524 525func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error { 526 cs.mu.Lock() 527 for { 528 if cs.committed { 529 cs.mu.Unlock() 530 return op(cs.attempt) 531 } 532 a := cs.attempt 533 cs.mu.Unlock() 534 err := op(a) 535 cs.mu.Lock() 536 if a != cs.attempt { 537 // We started another attempt already. 538 continue 539 } 540 if err == io.EOF { 541 <-a.s.Done() 542 } 543 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) { 544 onSuccess() 545 cs.mu.Unlock() 546 return err 547 } 548 if err := cs.retryLocked(err); err != nil { 549 cs.mu.Unlock() 550 return err 551 } 552 } 553} 554 555func (cs *clientStream) Header() (metadata.MD, error) { 556 var m metadata.MD 557 err := cs.withRetry(func(a *csAttempt) error { 558 var err error 559 m, err = a.s.Header() 560 return toRPCErr(err) 561 }, cs.commitAttemptLocked) 562 if err != nil { 563 cs.finish(err) 564 } 565 return m, err 566} 567 568func (cs *clientStream) Trailer() metadata.MD { 569 // On RPC failure, we never need to retry, because usage requires that 570 // RecvMsg() returned a non-nil error before calling this function is valid. 571 // We would have retried earlier if necessary. 572 // 573 // Commit the attempt anyway, just in case users are not following those 574 // directions -- it will prevent races and should not meaningfully impact 575 // performance. 576 cs.commitAttempt() 577 if cs.attempt.s == nil { 578 return nil 579 } 580 return cs.attempt.s.Trailer() 581} 582 583func (cs *clientStream) replayBufferLocked() error { 584 a := cs.attempt 585 for _, f := range cs.buffer { 586 if err := f(a); err != nil { 587 return err 588 } 589 } 590 return nil 591} 592 593func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) { 594 // Note: we still will buffer if retry is disabled (for transparent retries). 595 if cs.committed { 596 return 597 } 598 cs.bufferSize += sz 599 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize { 600 cs.commitAttemptLocked() 601 return 602 } 603 cs.buffer = append(cs.buffer, op) 604} 605 606func (cs *clientStream) SendMsg(m interface{}) (err error) { 607 defer func() { 608 if err != nil && err != io.EOF { 609 // Call finish on the client stream for errors generated by this SendMsg 610 // call, as these indicate problems created by this client. (Transport 611 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real 612 // error will be returned from RecvMsg eventually in that case, or be 613 // retried.) 614 cs.finish(err) 615 } 616 }() 617 if cs.sentLast { 618 return status.Errorf(codes.Internal, "SendMsg called after CloseSend") 619 } 620 if !cs.desc.ClientStreams { 621 cs.sentLast = true 622 } 623 data, err := encode(cs.codec, m) 624 if err != nil { 625 return err 626 } 627 compData, err := compress(data, cs.cp, cs.comp) 628 if err != nil { 629 return err 630 } 631 hdr, payload := msgHeader(data, compData) 632 // TODO(dfawley): should we be checking len(data) instead? 633 if len(payload) > *cs.callInfo.maxSendMessageSize { 634 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) 635 } 636 op := func(a *csAttempt) error { 637 err := a.sendMsg(m, hdr, payload, data) 638 // nil out the message and uncomp when replaying; they are only needed for 639 // stats which is disabled for subsequent attempts. 640 m, data = nil, nil 641 return err 642 } 643 return cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) }) 644} 645 646func (cs *clientStream) RecvMsg(m interface{}) error { 647 err := cs.withRetry(func(a *csAttempt) error { 648 return a.recvMsg(m) 649 }, cs.commitAttemptLocked) 650 if err != nil || !cs.desc.ServerStreams { 651 // err != nil or non-server-streaming indicates end of stream. 652 cs.finish(err) 653 } 654 return err 655} 656 657func (cs *clientStream) CloseSend() error { 658 if cs.sentLast { 659 // TODO: return an error and finish the stream instead, due to API misuse? 660 return nil 661 } 662 cs.sentLast = true 663 op := func(a *csAttempt) error { 664 a.t.Write(a.s, nil, nil, &transport.Options{Last: true}) 665 // Always return nil; io.EOF is the only error that might make sense 666 // instead, but there is no need to signal the client to call RecvMsg 667 // as the only use left for the stream after CloseSend is to call 668 // RecvMsg. This also matches historical behavior. 669 return nil 670 } 671 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }) 672 // We never returned an error here for reasons. 673 return nil 674} 675 676func (cs *clientStream) finish(err error) { 677 if err == io.EOF { 678 // Ending a stream with EOF indicates a success. 679 err = nil 680 } 681 cs.mu.Lock() 682 if cs.finished { 683 cs.mu.Unlock() 684 return 685 } 686 cs.finished = true 687 cs.commitAttemptLocked() 688 cs.mu.Unlock() 689 if err == nil { 690 cs.retryThrottler.successfulRPC() 691 } 692 if channelz.IsOn() { 693 if err != nil { 694 cs.cc.incrCallsFailed() 695 } else { 696 cs.cc.incrCallsSucceeded() 697 } 698 } 699 if cs.attempt != nil { 700 cs.attempt.finish(err) 701 } 702 // after functions all rely upon having a stream. 703 if cs.attempt.s != nil { 704 for _, o := range cs.opts { 705 o.after(cs.callInfo) 706 } 707 } 708 cs.cancel() 709} 710 711func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { 712 cs := a.cs 713 if EnableTracing { 714 a.mu.Lock() 715 if a.trInfo.tr != nil { 716 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) 717 } 718 a.mu.Unlock() 719 } 720 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { 721 if !cs.desc.ClientStreams { 722 // For non-client-streaming RPCs, we return nil instead of EOF on error 723 // because the generated code requires it. finish is not called; RecvMsg() 724 // will call it with the stream's status independently. 725 return nil 726 } 727 return io.EOF 728 } 729 if a.statsHandler != nil { 730 a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now())) 731 } 732 if channelz.IsOn() { 733 a.t.IncrMsgSent() 734 } 735 return nil 736} 737 738func (a *csAttempt) recvMsg(m interface{}) (err error) { 739 cs := a.cs 740 var inPayload *stats.InPayload 741 if a.statsHandler != nil { 742 inPayload = &stats.InPayload{ 743 Client: true, 744 } 745 } 746 if !a.decompSet { 747 // Block until we receive headers containing received message encoding. 748 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity { 749 if a.dc == nil || a.dc.Type() != ct { 750 // No configured decompressor, or it does not match the incoming 751 // message encoding; attempt to find a registered compressor that does. 752 a.dc = nil 753 a.decomp = encoding.GetCompressor(ct) 754 } 755 } else { 756 // No compression is used; disable our decompressor. 757 a.dc = nil 758 } 759 // Only initialize this state once per stream. 760 a.decompSet = true 761 } 762 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, inPayload, a.decomp) 763 if err != nil { 764 if err == io.EOF { 765 if statusErr := a.s.Status().Err(); statusErr != nil { 766 return statusErr 767 } 768 return io.EOF // indicates successful end of stream. 769 } 770 return toRPCErr(err) 771 } 772 if EnableTracing { 773 a.mu.Lock() 774 if a.trInfo.tr != nil { 775 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) 776 } 777 a.mu.Unlock() 778 } 779 if inPayload != nil { 780 a.statsHandler.HandleRPC(cs.ctx, inPayload) 781 } 782 if channelz.IsOn() { 783 a.t.IncrMsgRecv() 784 } 785 if cs.desc.ServerStreams { 786 // Subsequent messages should be received by subsequent RecvMsg calls. 787 return nil 788 } 789 790 // Special handling for non-server-stream rpcs. 791 // This recv expects EOF or errors, so we don't collect inPayload. 792 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp) 793 if err == nil { 794 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) 795 } 796 if err == io.EOF { 797 return a.s.Status().Err() // non-server streaming Recv returns nil on success 798 } 799 return toRPCErr(err) 800} 801 802func (a *csAttempt) finish(err error) { 803 a.mu.Lock() 804 if a.finished { 805 a.mu.Unlock() 806 return 807 } 808 a.finished = true 809 if err == io.EOF { 810 // Ending a stream with EOF indicates a success. 811 err = nil 812 } 813 if a.s != nil { 814 a.t.CloseStream(a.s, err) 815 } 816 817 if a.done != nil { 818 br := false 819 if a.s != nil { 820 br = a.s.BytesReceived() 821 } 822 a.done(balancer.DoneInfo{ 823 Err: err, 824 BytesSent: a.s != nil, 825 BytesReceived: br, 826 }) 827 } 828 if a.statsHandler != nil { 829 end := &stats.End{ 830 Client: true, 831 BeginTime: a.cs.beginTime, 832 EndTime: time.Now(), 833 Error: err, 834 } 835 a.statsHandler.HandleRPC(a.cs.ctx, end) 836 } 837 if a.trInfo.tr != nil { 838 if err == nil { 839 a.trInfo.tr.LazyPrintf("RPC: [OK]") 840 } else { 841 a.trInfo.tr.LazyPrintf("RPC: [%v]", err) 842 a.trInfo.tr.SetError() 843 } 844 a.trInfo.tr.Finish() 845 a.trInfo.tr = nil 846 } 847 a.mu.Unlock() 848} 849 850// ServerStream defines the server-side behavior of a streaming RPC. 851// 852// All errors returned from ServerStream methods are compatible with the 853// status package. 854type ServerStream interface { 855 // SetHeader sets the header metadata. It may be called multiple times. 856 // When call multiple times, all the provided metadata will be merged. 857 // All the metadata will be sent out when one of the following happens: 858 // - ServerStream.SendHeader() is called; 859 // - The first response is sent out; 860 // - An RPC status is sent out (error or success). 861 SetHeader(metadata.MD) error 862 // SendHeader sends the header metadata. 863 // The provided md and headers set by SetHeader() will be sent. 864 // It fails if called multiple times. 865 SendHeader(metadata.MD) error 866 // SetTrailer sets the trailer metadata which will be sent with the RPC status. 867 // When called more than once, all the provided metadata will be merged. 868 SetTrailer(metadata.MD) 869 // Context returns the context for this stream. 870 Context() context.Context 871 // SendMsg sends a message. On error, SendMsg aborts the stream and the 872 // error is returned directly. 873 // 874 // SendMsg blocks until: 875 // - There is sufficient flow control to schedule m with the transport, or 876 // - The stream is done, or 877 // - The stream breaks. 878 // 879 // SendMsg does not wait until the message is received by the client. An 880 // untimely stream closure may result in lost messages. 881 // 882 // It is safe to have a goroutine calling SendMsg and another goroutine 883 // calling RecvMsg on the same stream at the same time, but it is not safe 884 // to call SendMsg on the same stream in different goroutines. 885 SendMsg(m interface{}) error 886 // RecvMsg blocks until it receives a message into m or the stream is 887 // done. It returns io.EOF when the client has performed a CloseSend. On 888 // any non-EOF error, the stream is aborted and the error contains the 889 // RPC status. 890 // 891 // It is safe to have a goroutine calling SendMsg and another goroutine 892 // calling RecvMsg on the same stream at the same time, but it is not 893 // safe to call RecvMsg on the same stream in different goroutines. 894 RecvMsg(m interface{}) error 895} 896 897// serverStream implements a server side Stream. 898type serverStream struct { 899 ctx context.Context 900 t transport.ServerTransport 901 s *transport.Stream 902 p *parser 903 codec baseCodec 904 905 cp Compressor 906 dc Decompressor 907 comp encoding.Compressor 908 decomp encoding.Compressor 909 910 maxReceiveMessageSize int 911 maxSendMessageSize int 912 trInfo *traceInfo 913 914 statsHandler stats.Handler 915 916 mu sync.Mutex // protects trInfo.tr after the service handler runs. 917} 918 919func (ss *serverStream) Context() context.Context { 920 return ss.ctx 921} 922 923func (ss *serverStream) SetHeader(md metadata.MD) error { 924 if md.Len() == 0 { 925 return nil 926 } 927 return ss.s.SetHeader(md) 928} 929 930func (ss *serverStream) SendHeader(md metadata.MD) error { 931 return ss.t.WriteHeader(ss.s, md) 932} 933 934func (ss *serverStream) SetTrailer(md metadata.MD) { 935 if md.Len() == 0 { 936 return 937 } 938 ss.s.SetTrailer(md) 939} 940 941func (ss *serverStream) SendMsg(m interface{}) (err error) { 942 defer func() { 943 if ss.trInfo != nil { 944 ss.mu.Lock() 945 if ss.trInfo.tr != nil { 946 if err == nil { 947 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) 948 } else { 949 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 950 ss.trInfo.tr.SetError() 951 } 952 } 953 ss.mu.Unlock() 954 } 955 if err != nil && err != io.EOF { 956 st, _ := status.FromError(toRPCErr(err)) 957 ss.t.WriteStatus(ss.s, st) 958 } 959 if channelz.IsOn() && err == nil { 960 ss.t.IncrMsgSent() 961 } 962 }() 963 data, err := encode(ss.codec, m) 964 if err != nil { 965 return err 966 } 967 compData, err := compress(data, ss.cp, ss.comp) 968 if err != nil { 969 return err 970 } 971 hdr, payload := msgHeader(data, compData) 972 // TODO(dfawley): should we be checking len(data) instead? 973 if len(payload) > ss.maxSendMessageSize { 974 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) 975 } 976 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { 977 return toRPCErr(err) 978 } 979 if ss.statsHandler != nil { 980 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) 981 } 982 return nil 983} 984 985func (ss *serverStream) RecvMsg(m interface{}) (err error) { 986 defer func() { 987 if ss.trInfo != nil { 988 ss.mu.Lock() 989 if ss.trInfo.tr != nil { 990 if err == nil { 991 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) 992 } else if err != io.EOF { 993 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 994 ss.trInfo.tr.SetError() 995 } 996 } 997 ss.mu.Unlock() 998 } 999 if err != nil && err != io.EOF { 1000 st, _ := status.FromError(toRPCErr(err)) 1001 ss.t.WriteStatus(ss.s, st) 1002 } 1003 if channelz.IsOn() && err == nil { 1004 ss.t.IncrMsgRecv() 1005 } 1006 }() 1007 var inPayload *stats.InPayload 1008 if ss.statsHandler != nil { 1009 inPayload = &stats.InPayload{} 1010 } 1011 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, inPayload, ss.decomp); err != nil { 1012 if err == io.EOF { 1013 return err 1014 } 1015 if err == io.ErrUnexpectedEOF { 1016 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) 1017 } 1018 return toRPCErr(err) 1019 } 1020 if inPayload != nil { 1021 ss.statsHandler.HandleRPC(ss.s.Context(), inPayload) 1022 } 1023 return nil 1024} 1025 1026// MethodFromServerStream returns the method string for the input stream. 1027// The returned string is in the format of "/service/method". 1028func MethodFromServerStream(stream ServerStream) (string, bool) { 1029 return Method(stream.Context()) 1030} 1031