1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "io" 25 "math" 26 "strconv" 27 "sync" 28 "time" 29 30 "golang.org/x/net/trace" 31 "google.golang.org/grpc/balancer" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/connectivity" 34 "google.golang.org/grpc/encoding" 35 "google.golang.org/grpc/grpclog" 36 "google.golang.org/grpc/internal/balancerload" 37 "google.golang.org/grpc/internal/binarylog" 38 "google.golang.org/grpc/internal/channelz" 39 "google.golang.org/grpc/internal/grpcrand" 40 "google.golang.org/grpc/internal/transport" 41 "google.golang.org/grpc/metadata" 42 "google.golang.org/grpc/peer" 43 "google.golang.org/grpc/stats" 44 "google.golang.org/grpc/status" 45) 46 47// StreamHandler defines the handler called by gRPC server to complete the 48// execution of a streaming RPC. If a StreamHandler returns an error, it 49// should be produced by the status package, or else gRPC will use 50// codes.Unknown as the status code and err.Error() as the status message 51// of the RPC. 52type StreamHandler func(srv interface{}, stream ServerStream) error 53 54// StreamDesc represents a streaming RPC service's method specification. 55type StreamDesc struct { 56 StreamName string 57 Handler StreamHandler 58 59 // At least one of these is true. 60 ServerStreams bool 61 ClientStreams bool 62} 63 64// Stream defines the common interface a client or server stream has to satisfy. 65// 66// Deprecated: See ClientStream and ServerStream documentation instead. 67type Stream interface { 68 // Deprecated: See ClientStream and ServerStream documentation instead. 69 Context() context.Context 70 // Deprecated: See ClientStream and ServerStream documentation instead. 71 SendMsg(m interface{}) error 72 // Deprecated: See ClientStream and ServerStream documentation instead. 73 RecvMsg(m interface{}) error 74} 75 76// ClientStream defines the client-side behavior of a streaming RPC. 77// 78// All errors returned from ClientStream methods are compatible with the 79// status package. 80type ClientStream interface { 81 // Header returns the header metadata received from the server if there 82 // is any. It blocks if the metadata is not ready to read. 83 Header() (metadata.MD, error) 84 // Trailer returns the trailer metadata from the server, if there is any. 85 // It must only be called after stream.CloseAndRecv has returned, or 86 // stream.Recv has returned a non-nil error (including io.EOF). 87 Trailer() metadata.MD 88 // CloseSend closes the send direction of the stream. It closes the stream 89 // when non-nil error is met. It is also not safe to call CloseSend 90 // concurrently with SendMsg. 91 CloseSend() error 92 // Context returns the context for this stream. 93 // 94 // It should not be called until after Header or RecvMsg has returned. Once 95 // called, subsequent client-side retries are disabled. 96 Context() context.Context 97 // SendMsg is generally called by generated code. On error, SendMsg aborts 98 // the stream. If the error was generated by the client, the status is 99 // returned directly; otherwise, io.EOF is returned and the status of 100 // the stream may be discovered using RecvMsg. 101 // 102 // SendMsg blocks until: 103 // - There is sufficient flow control to schedule m with the transport, or 104 // - The stream is done, or 105 // - The stream breaks. 106 // 107 // SendMsg does not wait until the message is received by the server. An 108 // untimely stream closure may result in lost messages. To ensure delivery, 109 // users should ensure the RPC completed successfully using RecvMsg. 110 // 111 // It is safe to have a goroutine calling SendMsg and another goroutine 112 // calling RecvMsg on the same stream at the same time, but it is not safe 113 // to call SendMsg on the same stream in different goroutines. It is also 114 // not safe to call CloseSend concurrently with SendMsg. 115 SendMsg(m interface{}) error 116 // RecvMsg blocks until it receives a message into m or the stream is 117 // done. It returns io.EOF when the stream completes successfully. On 118 // any other error, the stream is aborted and the error contains the RPC 119 // status. 120 // 121 // It is safe to have a goroutine calling SendMsg and another goroutine 122 // calling RecvMsg on the same stream at the same time, but it is not 123 // safe to call RecvMsg on the same stream in different goroutines. 124 RecvMsg(m interface{}) error 125} 126 127// NewStream creates a new Stream for the client side. This is typically 128// called by generated code. ctx is used for the lifetime of the stream. 129// 130// To ensure resources are not leaked due to the stream returned, one of the following 131// actions must be performed: 132// 133// 1. Call Close on the ClientConn. 134// 2. Cancel the context provided. 135// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated 136// client-streaming RPC, for instance, might use the helper function 137// CloseAndRecv (note that CloseSend does not Recv, therefore is not 138// guaranteed to release all resources). 139// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg. 140// 141// If none of the above happen, a goroutine and a context will be leaked, and grpc 142// will not call the optionally-configured stats handler with a stats.End message. 143func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { 144 // allow interceptor to see all applicable call options, which means those 145 // configured as defaults from dial option as well as per-call options 146 opts = combine(cc.dopts.callOptions, opts) 147 148 if cc.dopts.streamInt != nil { 149 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) 150 } 151 return newClientStream(ctx, desc, cc, method, opts...) 152} 153 154// NewClientStream is a wrapper for ClientConn.NewStream. 155func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 156 return cc.NewStream(ctx, desc, method, opts...) 157} 158 159func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { 160 if channelz.IsOn() { 161 cc.incrCallsStarted() 162 defer func() { 163 if err != nil { 164 cc.incrCallsFailed() 165 } 166 }() 167 } 168 c := defaultCallInfo() 169 // Provide an opportunity for the first RPC to see the first service config 170 // provided by the resolver. 171 if err := cc.waitForResolvedAddrs(ctx); err != nil { 172 return nil, err 173 } 174 mc := cc.GetMethodConfig(method) 175 if mc.WaitForReady != nil { 176 c.failFast = !*mc.WaitForReady 177 } 178 179 // Possible context leak: 180 // The cancel function for the child context we create will only be called 181 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if 182 // an error is generated by SendMsg. 183 // https://github.com/grpc/grpc-go/issues/1818. 184 var cancel context.CancelFunc 185 if mc.Timeout != nil && *mc.Timeout >= 0 { 186 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) 187 } else { 188 ctx, cancel = context.WithCancel(ctx) 189 } 190 defer func() { 191 if err != nil { 192 cancel() 193 } 194 }() 195 196 for _, o := range opts { 197 if err := o.before(c); err != nil { 198 return nil, toRPCErr(err) 199 } 200 } 201 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) 202 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) 203 if err := setCallInfoCodec(c); err != nil { 204 return nil, err 205 } 206 207 callHdr := &transport.CallHdr{ 208 Host: cc.authority, 209 Method: method, 210 ContentSubtype: c.contentSubtype, 211 } 212 213 // Set our outgoing compression according to the UseCompressor CallOption, if 214 // set. In that case, also find the compressor from the encoding package. 215 // Otherwise, use the compressor configured by the WithCompressor DialOption, 216 // if set. 217 var cp Compressor 218 var comp encoding.Compressor 219 if ct := c.compressorType; ct != "" { 220 callHdr.SendCompress = ct 221 if ct != encoding.Identity { 222 comp = encoding.GetCompressor(ct) 223 if comp == nil { 224 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) 225 } 226 } 227 } else if cc.dopts.cp != nil { 228 callHdr.SendCompress = cc.dopts.cp.Type() 229 cp = cc.dopts.cp 230 } 231 if c.creds != nil { 232 callHdr.Creds = c.creds 233 } 234 var trInfo *traceInfo 235 if EnableTracing { 236 trInfo = &traceInfo{ 237 tr: trace.New("grpc.Sent."+methodFamily(method), method), 238 firstLine: firstLine{ 239 client: true, 240 }, 241 } 242 if deadline, ok := ctx.Deadline(); ok { 243 trInfo.firstLine.deadline = time.Until(deadline) 244 } 245 trInfo.tr.LazyLog(&trInfo.firstLine, false) 246 ctx = trace.NewContext(ctx, trInfo.tr) 247 } 248 ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp) 249 sh := cc.dopts.copts.StatsHandler 250 var beginTime time.Time 251 if sh != nil { 252 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) 253 beginTime = time.Now() 254 begin := &stats.Begin{ 255 Client: true, 256 BeginTime: beginTime, 257 FailFast: c.failFast, 258 } 259 sh.HandleRPC(ctx, begin) 260 } 261 262 cs := &clientStream{ 263 callHdr: callHdr, 264 ctx: ctx, 265 methodConfig: &mc, 266 opts: opts, 267 callInfo: c, 268 cc: cc, 269 desc: desc, 270 codec: c.codec, 271 cp: cp, 272 comp: comp, 273 cancel: cancel, 274 beginTime: beginTime, 275 firstAttempt: true, 276 } 277 if !cc.dopts.disableRetry { 278 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler) 279 } 280 cs.binlog = binarylog.GetMethodLogger(method) 281 282 cs.callInfo.stream = cs 283 // Only this initial attempt has stats/tracing. 284 // TODO(dfawley): move to newAttempt when per-attempt stats are implemented. 285 if err := cs.newAttemptLocked(sh, trInfo); err != nil { 286 cs.finish(err) 287 return nil, err 288 } 289 290 op := func(a *csAttempt) error { return a.newStream() } 291 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil { 292 cs.finish(err) 293 return nil, err 294 } 295 296 if cs.binlog != nil { 297 md, _ := metadata.FromOutgoingContext(ctx) 298 logEntry := &binarylog.ClientHeader{ 299 OnClientSide: true, 300 Header: md, 301 MethodName: method, 302 Authority: cs.cc.authority, 303 } 304 if deadline, ok := ctx.Deadline(); ok { 305 logEntry.Timeout = time.Until(deadline) 306 if logEntry.Timeout < 0 { 307 logEntry.Timeout = 0 308 } 309 } 310 cs.binlog.Log(logEntry) 311 } 312 313 if desc != unaryStreamDesc { 314 // Listen on cc and stream contexts to cleanup when the user closes the 315 // ClientConn or cancels the stream context. In all other cases, an error 316 // should already be injected into the recv buffer by the transport, which 317 // the client will eventually receive, and then we will cancel the stream's 318 // context in clientStream.finish. 319 go func() { 320 select { 321 case <-cc.ctx.Done(): 322 cs.finish(ErrClientConnClosing) 323 case <-ctx.Done(): 324 cs.finish(toRPCErr(ctx.Err())) 325 } 326 }() 327 } 328 return cs, nil 329} 330 331func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) error { 332 cs.attempt = &csAttempt{ 333 cs: cs, 334 dc: cs.cc.dopts.dc, 335 statsHandler: sh, 336 trInfo: trInfo, 337 } 338 339 if err := cs.ctx.Err(); err != nil { 340 return toRPCErr(err) 341 } 342 t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method) 343 if err != nil { 344 return err 345 } 346 if trInfo != nil { 347 trInfo.firstLine.SetRemoteAddr(t.RemoteAddr()) 348 } 349 cs.attempt.t = t 350 cs.attempt.done = done 351 return nil 352} 353 354func (a *csAttempt) newStream() error { 355 cs := a.cs 356 cs.callHdr.PreviousAttempts = cs.numRetries 357 s, err := a.t.NewStream(cs.ctx, cs.callHdr) 358 if err != nil { 359 return toRPCErr(err) 360 } 361 cs.attempt.s = s 362 cs.attempt.p = &parser{r: s} 363 return nil 364} 365 366// clientStream implements a client side Stream. 367type clientStream struct { 368 callHdr *transport.CallHdr 369 opts []CallOption 370 callInfo *callInfo 371 cc *ClientConn 372 desc *StreamDesc 373 374 codec baseCodec 375 cp Compressor 376 comp encoding.Compressor 377 378 cancel context.CancelFunc // cancels all attempts 379 380 sentLast bool // sent an end stream 381 beginTime time.Time 382 383 methodConfig *MethodConfig 384 385 ctx context.Context // the application's context, wrapped by stats/tracing 386 387 retryThrottler *retryThrottler // The throttler active when the RPC began. 388 389 binlog *binarylog.MethodLogger // Binary logger, can be nil. 390 // serverHeaderBinlogged is a boolean for whether server header has been 391 // logged. Server header will be logged when the first time one of those 392 // happens: stream.Header(), stream.Recv(). 393 // 394 // It's only read and used by Recv() and Header(), so it doesn't need to be 395 // synchronized. 396 serverHeaderBinlogged bool 397 398 mu sync.Mutex 399 firstAttempt bool // if true, transparent retry is valid 400 numRetries int // exclusive of transparent retry attempt(s) 401 numRetriesSincePushback int // retries since pushback; to reset backoff 402 finished bool // TODO: replace with atomic cmpxchg or sync.Once? 403 attempt *csAttempt // the active client stream attempt 404 // TODO(hedging): hedging will have multiple attempts simultaneously. 405 committed bool // active attempt committed for retry? 406 buffer []func(a *csAttempt) error // operations to replay on retry 407 bufferSize int // current size of buffer 408} 409 410// csAttempt implements a single transport stream attempt within a 411// clientStream. 412type csAttempt struct { 413 cs *clientStream 414 t transport.ClientTransport 415 s *transport.Stream 416 p *parser 417 done func(balancer.DoneInfo) 418 419 finished bool 420 dc Decompressor 421 decomp encoding.Compressor 422 decompSet bool 423 424 mu sync.Mutex // guards trInfo.tr 425 // trInfo may be nil (if EnableTracing is false). 426 // trInfo.tr is set when created (if EnableTracing is true), 427 // and cleared when the finish method is called. 428 trInfo *traceInfo 429 430 statsHandler stats.Handler 431} 432 433func (cs *clientStream) commitAttemptLocked() { 434 cs.committed = true 435 cs.buffer = nil 436} 437 438func (cs *clientStream) commitAttempt() { 439 cs.mu.Lock() 440 cs.commitAttemptLocked() 441 cs.mu.Unlock() 442} 443 444// shouldRetry returns nil if the RPC should be retried; otherwise it returns 445// the error that should be returned by the operation. 446func (cs *clientStream) shouldRetry(err error) error { 447 if cs.attempt.s == nil && !cs.callInfo.failFast { 448 // In the event of any error from NewStream (attempt.s == nil), we 449 // never attempted to write anything to the wire, so we can retry 450 // indefinitely for non-fail-fast RPCs. 451 return nil 452 } 453 if cs.finished || cs.committed { 454 // RPC is finished or committed; cannot retry. 455 return err 456 } 457 // Wait for the trailers. 458 if cs.attempt.s != nil { 459 <-cs.attempt.s.Done() 460 } 461 if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) { 462 // First attempt, wait-for-ready, stream unprocessed: transparently retry. 463 cs.firstAttempt = false 464 return nil 465 } 466 cs.firstAttempt = false 467 if cs.cc.dopts.disableRetry { 468 return err 469 } 470 471 pushback := 0 472 hasPushback := false 473 if cs.attempt.s != nil { 474 if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to { 475 return err 476 } 477 478 // TODO(retry): Move down if the spec changes to not check server pushback 479 // before considering this a failure for throttling. 480 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"] 481 if len(sps) == 1 { 482 var e error 483 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 { 484 grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0]) 485 cs.retryThrottler.throttle() // This counts as a failure for throttling. 486 return err 487 } 488 hasPushback = true 489 } else if len(sps) > 1 { 490 grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps) 491 cs.retryThrottler.throttle() // This counts as a failure for throttling. 492 return err 493 } 494 } 495 496 var code codes.Code 497 if cs.attempt.s != nil { 498 code = cs.attempt.s.Status().Code() 499 } else { 500 code = status.Convert(err).Code() 501 } 502 503 rp := cs.methodConfig.retryPolicy 504 if rp == nil || !rp.retryableStatusCodes[code] { 505 return err 506 } 507 508 // Note: the ordering here is important; we count this as a failure 509 // only if the code matched a retryable code. 510 if cs.retryThrottler.throttle() { 511 return err 512 } 513 if cs.numRetries+1 >= rp.maxAttempts { 514 return err 515 } 516 517 var dur time.Duration 518 if hasPushback { 519 dur = time.Millisecond * time.Duration(pushback) 520 cs.numRetriesSincePushback = 0 521 } else { 522 fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback)) 523 cur := float64(rp.initialBackoff) * fact 524 if max := float64(rp.maxBackoff); cur > max { 525 cur = max 526 } 527 dur = time.Duration(grpcrand.Int63n(int64(cur))) 528 cs.numRetriesSincePushback++ 529 } 530 531 // TODO(dfawley): we could eagerly fail here if dur puts us past the 532 // deadline, but unsure if it is worth doing. 533 t := time.NewTimer(dur) 534 select { 535 case <-t.C: 536 cs.numRetries++ 537 return nil 538 case <-cs.ctx.Done(): 539 t.Stop() 540 return status.FromContextError(cs.ctx.Err()).Err() 541 } 542} 543 544// Returns nil if a retry was performed and succeeded; error otherwise. 545func (cs *clientStream) retryLocked(lastErr error) error { 546 for { 547 cs.attempt.finish(lastErr) 548 if err := cs.shouldRetry(lastErr); err != nil { 549 cs.commitAttemptLocked() 550 return err 551 } 552 if err := cs.newAttemptLocked(nil, nil); err != nil { 553 return err 554 } 555 if lastErr = cs.replayBufferLocked(); lastErr == nil { 556 return nil 557 } 558 } 559} 560 561func (cs *clientStream) Context() context.Context { 562 cs.commitAttempt() 563 // No need to lock before using attempt, since we know it is committed and 564 // cannot change. 565 return cs.attempt.s.Context() 566} 567 568func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error { 569 cs.mu.Lock() 570 for { 571 if cs.committed { 572 cs.mu.Unlock() 573 return op(cs.attempt) 574 } 575 a := cs.attempt 576 cs.mu.Unlock() 577 err := op(a) 578 cs.mu.Lock() 579 if a != cs.attempt { 580 // We started another attempt already. 581 continue 582 } 583 if err == io.EOF { 584 <-a.s.Done() 585 } 586 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) { 587 onSuccess() 588 cs.mu.Unlock() 589 return err 590 } 591 if err := cs.retryLocked(err); err != nil { 592 cs.mu.Unlock() 593 return err 594 } 595 } 596} 597 598func (cs *clientStream) Header() (metadata.MD, error) { 599 var m metadata.MD 600 err := cs.withRetry(func(a *csAttempt) error { 601 var err error 602 m, err = a.s.Header() 603 return toRPCErr(err) 604 }, cs.commitAttemptLocked) 605 if err != nil { 606 cs.finish(err) 607 return nil, err 608 } 609 if cs.binlog != nil && !cs.serverHeaderBinlogged { 610 // Only log if binary log is on and header has not been logged. 611 logEntry := &binarylog.ServerHeader{ 612 OnClientSide: true, 613 Header: m, 614 PeerAddr: nil, 615 } 616 if peer, ok := peer.FromContext(cs.Context()); ok { 617 logEntry.PeerAddr = peer.Addr 618 } 619 cs.binlog.Log(logEntry) 620 cs.serverHeaderBinlogged = true 621 } 622 return m, err 623} 624 625func (cs *clientStream) Trailer() metadata.MD { 626 // On RPC failure, we never need to retry, because usage requires that 627 // RecvMsg() returned a non-nil error before calling this function is valid. 628 // We would have retried earlier if necessary. 629 // 630 // Commit the attempt anyway, just in case users are not following those 631 // directions -- it will prevent races and should not meaningfully impact 632 // performance. 633 cs.commitAttempt() 634 if cs.attempt.s == nil { 635 return nil 636 } 637 return cs.attempt.s.Trailer() 638} 639 640func (cs *clientStream) replayBufferLocked() error { 641 a := cs.attempt 642 for _, f := range cs.buffer { 643 if err := f(a); err != nil { 644 return err 645 } 646 } 647 return nil 648} 649 650func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) { 651 // Note: we still will buffer if retry is disabled (for transparent retries). 652 if cs.committed { 653 return 654 } 655 cs.bufferSize += sz 656 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize { 657 cs.commitAttemptLocked() 658 return 659 } 660 cs.buffer = append(cs.buffer, op) 661} 662 663func (cs *clientStream) SendMsg(m interface{}) (err error) { 664 defer func() { 665 if err != nil && err != io.EOF { 666 // Call finish on the client stream for errors generated by this SendMsg 667 // call, as these indicate problems created by this client. (Transport 668 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real 669 // error will be returned from RecvMsg eventually in that case, or be 670 // retried.) 671 cs.finish(err) 672 } 673 }() 674 if cs.sentLast { 675 return status.Errorf(codes.Internal, "SendMsg called after CloseSend") 676 } 677 if !cs.desc.ClientStreams { 678 cs.sentLast = true 679 } 680 681 // load hdr, payload, data 682 hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp) 683 if err != nil { 684 return err 685 } 686 687 // TODO(dfawley): should we be checking len(data) instead? 688 if len(payload) > *cs.callInfo.maxSendMessageSize { 689 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) 690 } 691 msgBytes := data // Store the pointer before setting to nil. For binary logging. 692 op := func(a *csAttempt) error { 693 err := a.sendMsg(m, hdr, payload, data) 694 // nil out the message and uncomp when replaying; they are only needed for 695 // stats which is disabled for subsequent attempts. 696 m, data = nil, nil 697 return err 698 } 699 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) }) 700 if cs.binlog != nil && err == nil { 701 cs.binlog.Log(&binarylog.ClientMessage{ 702 OnClientSide: true, 703 Message: msgBytes, 704 }) 705 } 706 return 707} 708 709func (cs *clientStream) RecvMsg(m interface{}) error { 710 if cs.binlog != nil && !cs.serverHeaderBinlogged { 711 // Call Header() to binary log header if it's not already logged. 712 cs.Header() 713 } 714 var recvInfo *payloadInfo 715 if cs.binlog != nil { 716 recvInfo = &payloadInfo{} 717 } 718 err := cs.withRetry(func(a *csAttempt) error { 719 return a.recvMsg(m, recvInfo) 720 }, cs.commitAttemptLocked) 721 if cs.binlog != nil && err == nil { 722 cs.binlog.Log(&binarylog.ServerMessage{ 723 OnClientSide: true, 724 Message: recvInfo.uncompressedBytes, 725 }) 726 } 727 if err != nil || !cs.desc.ServerStreams { 728 // err != nil or non-server-streaming indicates end of stream. 729 cs.finish(err) 730 731 if cs.binlog != nil { 732 // finish will not log Trailer. Log Trailer here. 733 logEntry := &binarylog.ServerTrailer{ 734 OnClientSide: true, 735 Trailer: cs.Trailer(), 736 Err: err, 737 } 738 if logEntry.Err == io.EOF { 739 logEntry.Err = nil 740 } 741 if peer, ok := peer.FromContext(cs.Context()); ok { 742 logEntry.PeerAddr = peer.Addr 743 } 744 cs.binlog.Log(logEntry) 745 } 746 } 747 return err 748} 749 750func (cs *clientStream) CloseSend() error { 751 if cs.sentLast { 752 // TODO: return an error and finish the stream instead, due to API misuse? 753 return nil 754 } 755 cs.sentLast = true 756 op := func(a *csAttempt) error { 757 a.t.Write(a.s, nil, nil, &transport.Options{Last: true}) 758 // Always return nil; io.EOF is the only error that might make sense 759 // instead, but there is no need to signal the client to call RecvMsg 760 // as the only use left for the stream after CloseSend is to call 761 // RecvMsg. This also matches historical behavior. 762 return nil 763 } 764 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }) 765 if cs.binlog != nil { 766 cs.binlog.Log(&binarylog.ClientHalfClose{ 767 OnClientSide: true, 768 }) 769 } 770 // We never returned an error here for reasons. 771 return nil 772} 773 774func (cs *clientStream) finish(err error) { 775 if err == io.EOF { 776 // Ending a stream with EOF indicates a success. 777 err = nil 778 } 779 cs.mu.Lock() 780 if cs.finished { 781 cs.mu.Unlock() 782 return 783 } 784 cs.finished = true 785 cs.commitAttemptLocked() 786 cs.mu.Unlock() 787 // For binary logging. only log cancel in finish (could be caused by RPC ctx 788 // canceled or ClientConn closed). Trailer will be logged in RecvMsg. 789 // 790 // Only one of cancel or trailer needs to be logged. In the cases where 791 // users don't call RecvMsg, users must have already canceled the RPC. 792 if cs.binlog != nil && status.Code(err) == codes.Canceled { 793 cs.binlog.Log(&binarylog.Cancel{ 794 OnClientSide: true, 795 }) 796 } 797 if err == nil { 798 cs.retryThrottler.successfulRPC() 799 } 800 if channelz.IsOn() { 801 if err != nil { 802 cs.cc.incrCallsFailed() 803 } else { 804 cs.cc.incrCallsSucceeded() 805 } 806 } 807 if cs.attempt != nil { 808 cs.attempt.finish(err) 809 } 810 // after functions all rely upon having a stream. 811 if cs.attempt.s != nil { 812 for _, o := range cs.opts { 813 o.after(cs.callInfo) 814 } 815 } 816 cs.cancel() 817} 818 819func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { 820 cs := a.cs 821 if a.trInfo != nil { 822 a.mu.Lock() 823 if a.trInfo.tr != nil { 824 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) 825 } 826 a.mu.Unlock() 827 } 828 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { 829 if !cs.desc.ClientStreams { 830 // For non-client-streaming RPCs, we return nil instead of EOF on error 831 // because the generated code requires it. finish is not called; RecvMsg() 832 // will call it with the stream's status independently. 833 return nil 834 } 835 return io.EOF 836 } 837 if a.statsHandler != nil { 838 a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now())) 839 } 840 if channelz.IsOn() { 841 a.t.IncrMsgSent() 842 } 843 return nil 844} 845 846func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { 847 cs := a.cs 848 if a.statsHandler != nil && payInfo == nil { 849 payInfo = &payloadInfo{} 850 } 851 852 if !a.decompSet { 853 // Block until we receive headers containing received message encoding. 854 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity { 855 if a.dc == nil || a.dc.Type() != ct { 856 // No configured decompressor, or it does not match the incoming 857 // message encoding; attempt to find a registered compressor that does. 858 a.dc = nil 859 a.decomp = encoding.GetCompressor(ct) 860 } 861 } else { 862 // No compression is used; disable our decompressor. 863 a.dc = nil 864 } 865 // Only initialize this state once per stream. 866 a.decompSet = true 867 } 868 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp) 869 if err != nil { 870 if err == io.EOF { 871 if statusErr := a.s.Status().Err(); statusErr != nil { 872 return statusErr 873 } 874 return io.EOF // indicates successful end of stream. 875 } 876 return toRPCErr(err) 877 } 878 if a.trInfo != nil { 879 a.mu.Lock() 880 if a.trInfo.tr != nil { 881 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) 882 } 883 a.mu.Unlock() 884 } 885 if a.statsHandler != nil { 886 a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{ 887 Client: true, 888 RecvTime: time.Now(), 889 Payload: m, 890 // TODO truncate large payload. 891 Data: payInfo.uncompressedBytes, 892 WireLength: payInfo.wireLength, 893 Length: len(payInfo.uncompressedBytes), 894 }) 895 } 896 if channelz.IsOn() { 897 a.t.IncrMsgRecv() 898 } 899 if cs.desc.ServerStreams { 900 // Subsequent messages should be received by subsequent RecvMsg calls. 901 return nil 902 } 903 // Special handling for non-server-stream rpcs. 904 // This recv expects EOF or errors, so we don't collect inPayload. 905 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp) 906 if err == nil { 907 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) 908 } 909 if err == io.EOF { 910 return a.s.Status().Err() // non-server streaming Recv returns nil on success 911 } 912 return toRPCErr(err) 913} 914 915func (a *csAttempt) finish(err error) { 916 a.mu.Lock() 917 if a.finished { 918 a.mu.Unlock() 919 return 920 } 921 a.finished = true 922 if err == io.EOF { 923 // Ending a stream with EOF indicates a success. 924 err = nil 925 } 926 var tr metadata.MD 927 if a.s != nil { 928 a.t.CloseStream(a.s, err) 929 tr = a.s.Trailer() 930 } 931 932 if a.done != nil { 933 br := false 934 if a.s != nil { 935 br = a.s.BytesReceived() 936 } 937 a.done(balancer.DoneInfo{ 938 Err: err, 939 Trailer: tr, 940 BytesSent: a.s != nil, 941 BytesReceived: br, 942 ServerLoad: balancerload.Parse(tr), 943 }) 944 } 945 if a.statsHandler != nil { 946 end := &stats.End{ 947 Client: true, 948 BeginTime: a.cs.beginTime, 949 EndTime: time.Now(), 950 Trailer: tr, 951 Error: err, 952 } 953 a.statsHandler.HandleRPC(a.cs.ctx, end) 954 } 955 if a.trInfo != nil && a.trInfo.tr != nil { 956 if err == nil { 957 a.trInfo.tr.LazyPrintf("RPC: [OK]") 958 } else { 959 a.trInfo.tr.LazyPrintf("RPC: [%v]", err) 960 a.trInfo.tr.SetError() 961 } 962 a.trInfo.tr.Finish() 963 a.trInfo.tr = nil 964 } 965 a.mu.Unlock() 966} 967 968func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) { 969 ac.mu.Lock() 970 if ac.transport != t { 971 ac.mu.Unlock() 972 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") 973 } 974 // transition to CONNECTING state when an attempt starts 975 if ac.state != connectivity.Connecting { 976 ac.updateConnectivityState(connectivity.Connecting) 977 ac.cc.handleSubConnStateChange(ac.acbw, ac.state) 978 } 979 ac.mu.Unlock() 980 981 if t == nil { 982 // TODO: return RPC error here? 983 return nil, errors.New("transport provided is nil") 984 } 985 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct. 986 c := &callInfo{} 987 988 for _, o := range opts { 989 if err := o.before(c); err != nil { 990 return nil, toRPCErr(err) 991 } 992 } 993 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) 994 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize) 995 996 // Possible context leak: 997 // The cancel function for the child context we create will only be called 998 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if 999 // an error is generated by SendMsg. 1000 // https://github.com/grpc/grpc-go/issues/1818. 1001 ctx, cancel := context.WithCancel(ctx) 1002 defer func() { 1003 if err != nil { 1004 cancel() 1005 } 1006 }() 1007 1008 if err := setCallInfoCodec(c); err != nil { 1009 return nil, err 1010 } 1011 1012 callHdr := &transport.CallHdr{ 1013 Host: ac.cc.authority, 1014 Method: method, 1015 ContentSubtype: c.contentSubtype, 1016 } 1017 1018 // Set our outgoing compression according to the UseCompressor CallOption, if 1019 // set. In that case, also find the compressor from the encoding package. 1020 // Otherwise, use the compressor configured by the WithCompressor DialOption, 1021 // if set. 1022 var cp Compressor 1023 var comp encoding.Compressor 1024 if ct := c.compressorType; ct != "" { 1025 callHdr.SendCompress = ct 1026 if ct != encoding.Identity { 1027 comp = encoding.GetCompressor(ct) 1028 if comp == nil { 1029 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) 1030 } 1031 } 1032 } else if ac.cc.dopts.cp != nil { 1033 callHdr.SendCompress = ac.cc.dopts.cp.Type() 1034 cp = ac.cc.dopts.cp 1035 } 1036 if c.creds != nil { 1037 callHdr.Creds = c.creds 1038 } 1039 1040 as := &addrConnStream{ 1041 callHdr: callHdr, 1042 ac: ac, 1043 ctx: ctx, 1044 cancel: cancel, 1045 opts: opts, 1046 callInfo: c, 1047 desc: desc, 1048 codec: c.codec, 1049 cp: cp, 1050 comp: comp, 1051 t: t, 1052 } 1053 1054 as.callInfo.stream = as 1055 s, err := as.t.NewStream(as.ctx, as.callHdr) 1056 if err != nil { 1057 err = toRPCErr(err) 1058 return nil, err 1059 } 1060 as.s = s 1061 as.p = &parser{r: s} 1062 ac.incrCallsStarted() 1063 if desc != unaryStreamDesc { 1064 // Listen on cc and stream contexts to cleanup when the user closes the 1065 // ClientConn or cancels the stream context. In all other cases, an error 1066 // should already be injected into the recv buffer by the transport, which 1067 // the client will eventually receive, and then we will cancel the stream's 1068 // context in clientStream.finish. 1069 go func() { 1070 select { 1071 case <-ac.ctx.Done(): 1072 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing")) 1073 case <-ctx.Done(): 1074 as.finish(toRPCErr(ctx.Err())) 1075 } 1076 }() 1077 } 1078 return as, nil 1079} 1080 1081type addrConnStream struct { 1082 s *transport.Stream 1083 ac *addrConn 1084 callHdr *transport.CallHdr 1085 cancel context.CancelFunc 1086 opts []CallOption 1087 callInfo *callInfo 1088 t transport.ClientTransport 1089 ctx context.Context 1090 sentLast bool 1091 desc *StreamDesc 1092 codec baseCodec 1093 cp Compressor 1094 comp encoding.Compressor 1095 decompSet bool 1096 dc Decompressor 1097 decomp encoding.Compressor 1098 p *parser 1099 mu sync.Mutex 1100 finished bool 1101} 1102 1103func (as *addrConnStream) Header() (metadata.MD, error) { 1104 m, err := as.s.Header() 1105 if err != nil { 1106 as.finish(toRPCErr(err)) 1107 } 1108 return m, err 1109} 1110 1111func (as *addrConnStream) Trailer() metadata.MD { 1112 return as.s.Trailer() 1113} 1114 1115func (as *addrConnStream) CloseSend() error { 1116 if as.sentLast { 1117 // TODO: return an error and finish the stream instead, due to API misuse? 1118 return nil 1119 } 1120 as.sentLast = true 1121 1122 as.t.Write(as.s, nil, nil, &transport.Options{Last: true}) 1123 // Always return nil; io.EOF is the only error that might make sense 1124 // instead, but there is no need to signal the client to call RecvMsg 1125 // as the only use left for the stream after CloseSend is to call 1126 // RecvMsg. This also matches historical behavior. 1127 return nil 1128} 1129 1130func (as *addrConnStream) Context() context.Context { 1131 return as.s.Context() 1132} 1133 1134func (as *addrConnStream) SendMsg(m interface{}) (err error) { 1135 defer func() { 1136 if err != nil && err != io.EOF { 1137 // Call finish on the client stream for errors generated by this SendMsg 1138 // call, as these indicate problems created by this client. (Transport 1139 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real 1140 // error will be returned from RecvMsg eventually in that case, or be 1141 // retried.) 1142 as.finish(err) 1143 } 1144 }() 1145 if as.sentLast { 1146 return status.Errorf(codes.Internal, "SendMsg called after CloseSend") 1147 } 1148 if !as.desc.ClientStreams { 1149 as.sentLast = true 1150 } 1151 1152 // load hdr, payload, data 1153 hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp) 1154 if err != nil { 1155 return err 1156 } 1157 1158 // TODO(dfawley): should we be checking len(data) instead? 1159 if len(payld) > *as.callInfo.maxSendMessageSize { 1160 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize) 1161 } 1162 1163 if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil { 1164 if !as.desc.ClientStreams { 1165 // For non-client-streaming RPCs, we return nil instead of EOF on error 1166 // because the generated code requires it. finish is not called; RecvMsg() 1167 // will call it with the stream's status independently. 1168 return nil 1169 } 1170 return io.EOF 1171 } 1172 1173 if channelz.IsOn() { 1174 as.t.IncrMsgSent() 1175 } 1176 return nil 1177} 1178 1179func (as *addrConnStream) RecvMsg(m interface{}) (err error) { 1180 defer func() { 1181 if err != nil || !as.desc.ServerStreams { 1182 // err != nil or non-server-streaming indicates end of stream. 1183 as.finish(err) 1184 } 1185 }() 1186 1187 if !as.decompSet { 1188 // Block until we receive headers containing received message encoding. 1189 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity { 1190 if as.dc == nil || as.dc.Type() != ct { 1191 // No configured decompressor, or it does not match the incoming 1192 // message encoding; attempt to find a registered compressor that does. 1193 as.dc = nil 1194 as.decomp = encoding.GetCompressor(ct) 1195 } 1196 } else { 1197 // No compression is used; disable our decompressor. 1198 as.dc = nil 1199 } 1200 // Only initialize this state once per stream. 1201 as.decompSet = true 1202 } 1203 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) 1204 if err != nil { 1205 if err == io.EOF { 1206 if statusErr := as.s.Status().Err(); statusErr != nil { 1207 return statusErr 1208 } 1209 return io.EOF // indicates successful end of stream. 1210 } 1211 return toRPCErr(err) 1212 } 1213 1214 if channelz.IsOn() { 1215 as.t.IncrMsgRecv() 1216 } 1217 if as.desc.ServerStreams { 1218 // Subsequent messages should be received by subsequent RecvMsg calls. 1219 return nil 1220 } 1221 1222 // Special handling for non-server-stream rpcs. 1223 // This recv expects EOF or errors, so we don't collect inPayload. 1224 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) 1225 if err == nil { 1226 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) 1227 } 1228 if err == io.EOF { 1229 return as.s.Status().Err() // non-server streaming Recv returns nil on success 1230 } 1231 return toRPCErr(err) 1232} 1233 1234func (as *addrConnStream) finish(err error) { 1235 as.mu.Lock() 1236 if as.finished { 1237 as.mu.Unlock() 1238 return 1239 } 1240 as.finished = true 1241 if err == io.EOF { 1242 // Ending a stream with EOF indicates a success. 1243 err = nil 1244 } 1245 if as.s != nil { 1246 as.t.CloseStream(as.s, err) 1247 } 1248 1249 if err != nil { 1250 as.ac.incrCallsFailed() 1251 } else { 1252 as.ac.incrCallsSucceeded() 1253 } 1254 as.cancel() 1255 as.mu.Unlock() 1256} 1257 1258// ServerStream defines the server-side behavior of a streaming RPC. 1259// 1260// All errors returned from ServerStream methods are compatible with the 1261// status package. 1262type ServerStream interface { 1263 // SetHeader sets the header metadata. It may be called multiple times. 1264 // When call multiple times, all the provided metadata will be merged. 1265 // All the metadata will be sent out when one of the following happens: 1266 // - ServerStream.SendHeader() is called; 1267 // - The first response is sent out; 1268 // - An RPC status is sent out (error or success). 1269 SetHeader(metadata.MD) error 1270 // SendHeader sends the header metadata. 1271 // The provided md and headers set by SetHeader() will be sent. 1272 // It fails if called multiple times. 1273 SendHeader(metadata.MD) error 1274 // SetTrailer sets the trailer metadata which will be sent with the RPC status. 1275 // When called more than once, all the provided metadata will be merged. 1276 SetTrailer(metadata.MD) 1277 // Context returns the context for this stream. 1278 Context() context.Context 1279 // SendMsg sends a message. On error, SendMsg aborts the stream and the 1280 // error is returned directly. 1281 // 1282 // SendMsg blocks until: 1283 // - There is sufficient flow control to schedule m with the transport, or 1284 // - The stream is done, or 1285 // - The stream breaks. 1286 // 1287 // SendMsg does not wait until the message is received by the client. An 1288 // untimely stream closure may result in lost messages. 1289 // 1290 // It is safe to have a goroutine calling SendMsg and another goroutine 1291 // calling RecvMsg on the same stream at the same time, but it is not safe 1292 // to call SendMsg on the same stream in different goroutines. 1293 SendMsg(m interface{}) error 1294 // RecvMsg blocks until it receives a message into m or the stream is 1295 // done. It returns io.EOF when the client has performed a CloseSend. On 1296 // any non-EOF error, the stream is aborted and the error contains the 1297 // RPC status. 1298 // 1299 // It is safe to have a goroutine calling SendMsg and another goroutine 1300 // calling RecvMsg on the same stream at the same time, but it is not 1301 // safe to call RecvMsg on the same stream in different goroutines. 1302 RecvMsg(m interface{}) error 1303} 1304 1305// serverStream implements a server side Stream. 1306type serverStream struct { 1307 ctx context.Context 1308 t transport.ServerTransport 1309 s *transport.Stream 1310 p *parser 1311 codec baseCodec 1312 1313 cp Compressor 1314 dc Decompressor 1315 comp encoding.Compressor 1316 decomp encoding.Compressor 1317 1318 maxReceiveMessageSize int 1319 maxSendMessageSize int 1320 trInfo *traceInfo 1321 1322 statsHandler stats.Handler 1323 1324 binlog *binarylog.MethodLogger 1325 // serverHeaderBinlogged indicates whether server header has been logged. It 1326 // will happen when one of the following two happens: stream.SendHeader(), 1327 // stream.Send(). 1328 // 1329 // It's only checked in send and sendHeader, doesn't need to be 1330 // synchronized. 1331 serverHeaderBinlogged bool 1332 1333 mu sync.Mutex // protects trInfo.tr after the service handler runs. 1334} 1335 1336func (ss *serverStream) Context() context.Context { 1337 return ss.ctx 1338} 1339 1340func (ss *serverStream) SetHeader(md metadata.MD) error { 1341 if md.Len() == 0 { 1342 return nil 1343 } 1344 return ss.s.SetHeader(md) 1345} 1346 1347func (ss *serverStream) SendHeader(md metadata.MD) error { 1348 err := ss.t.WriteHeader(ss.s, md) 1349 if ss.binlog != nil && !ss.serverHeaderBinlogged { 1350 h, _ := ss.s.Header() 1351 ss.binlog.Log(&binarylog.ServerHeader{ 1352 Header: h, 1353 }) 1354 ss.serverHeaderBinlogged = true 1355 } 1356 return err 1357} 1358 1359func (ss *serverStream) SetTrailer(md metadata.MD) { 1360 if md.Len() == 0 { 1361 return 1362 } 1363 ss.s.SetTrailer(md) 1364} 1365 1366func (ss *serverStream) SendMsg(m interface{}) (err error) { 1367 defer func() { 1368 if ss.trInfo != nil { 1369 ss.mu.Lock() 1370 if ss.trInfo.tr != nil { 1371 if err == nil { 1372 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) 1373 } else { 1374 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1375 ss.trInfo.tr.SetError() 1376 } 1377 } 1378 ss.mu.Unlock() 1379 } 1380 if err != nil && err != io.EOF { 1381 st, _ := status.FromError(toRPCErr(err)) 1382 ss.t.WriteStatus(ss.s, st) 1383 // Non-user specified status was sent out. This should be an error 1384 // case (as a server side Cancel maybe). 1385 // 1386 // This is not handled specifically now. User will return a final 1387 // status from the service handler, we will log that error instead. 1388 // This behavior is similar to an interceptor. 1389 } 1390 if channelz.IsOn() && err == nil { 1391 ss.t.IncrMsgSent() 1392 } 1393 }() 1394 1395 // load hdr, payload, data 1396 hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp) 1397 if err != nil { 1398 return err 1399 } 1400 1401 // TODO(dfawley): should we be checking len(data) instead? 1402 if len(payload) > ss.maxSendMessageSize { 1403 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) 1404 } 1405 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { 1406 return toRPCErr(err) 1407 } 1408 if ss.binlog != nil { 1409 if !ss.serverHeaderBinlogged { 1410 h, _ := ss.s.Header() 1411 ss.binlog.Log(&binarylog.ServerHeader{ 1412 Header: h, 1413 }) 1414 ss.serverHeaderBinlogged = true 1415 } 1416 ss.binlog.Log(&binarylog.ServerMessage{ 1417 Message: data, 1418 }) 1419 } 1420 if ss.statsHandler != nil { 1421 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) 1422 } 1423 return nil 1424} 1425 1426func (ss *serverStream) RecvMsg(m interface{}) (err error) { 1427 defer func() { 1428 if ss.trInfo != nil { 1429 ss.mu.Lock() 1430 if ss.trInfo.tr != nil { 1431 if err == nil { 1432 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) 1433 } else if err != io.EOF { 1434 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1435 ss.trInfo.tr.SetError() 1436 } 1437 } 1438 ss.mu.Unlock() 1439 } 1440 if err != nil && err != io.EOF { 1441 st, _ := status.FromError(toRPCErr(err)) 1442 ss.t.WriteStatus(ss.s, st) 1443 // Non-user specified status was sent out. This should be an error 1444 // case (as a server side Cancel maybe). 1445 // 1446 // This is not handled specifically now. User will return a final 1447 // status from the service handler, we will log that error instead. 1448 // This behavior is similar to an interceptor. 1449 } 1450 if channelz.IsOn() && err == nil { 1451 ss.t.IncrMsgRecv() 1452 } 1453 }() 1454 var payInfo *payloadInfo 1455 if ss.statsHandler != nil || ss.binlog != nil { 1456 payInfo = &payloadInfo{} 1457 } 1458 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil { 1459 if err == io.EOF { 1460 if ss.binlog != nil { 1461 ss.binlog.Log(&binarylog.ClientHalfClose{}) 1462 } 1463 return err 1464 } 1465 if err == io.ErrUnexpectedEOF { 1466 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) 1467 } 1468 return toRPCErr(err) 1469 } 1470 if ss.statsHandler != nil { 1471 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{ 1472 RecvTime: time.Now(), 1473 Payload: m, 1474 // TODO truncate large payload. 1475 Data: payInfo.uncompressedBytes, 1476 WireLength: payInfo.wireLength, 1477 Length: len(payInfo.uncompressedBytes), 1478 }) 1479 } 1480 if ss.binlog != nil { 1481 ss.binlog.Log(&binarylog.ClientMessage{ 1482 Message: payInfo.uncompressedBytes, 1483 }) 1484 } 1485 return nil 1486} 1487 1488// MethodFromServerStream returns the method string for the input stream. 1489// The returned string is in the format of "/service/method". 1490func MethodFromServerStream(stream ServerStream) (string, bool) { 1491 return Method(stream.Context()) 1492} 1493 1494// prepareMsg returns the hdr, payload and data 1495// using the compressors passed or using the 1496// passed preparedmsg 1497func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) { 1498 if preparedMsg, ok := m.(*PreparedMsg); ok { 1499 return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil 1500 } 1501 // The input interface is not a prepared msg. 1502 // Marshal and Compress the data at this point 1503 data, err = encode(codec, m) 1504 if err != nil { 1505 return nil, nil, nil, err 1506 } 1507 compData, err := compress(data, cp, comp) 1508 if err != nil { 1509 return nil, nil, nil, err 1510 } 1511 hdr, payload = msgHeader(data, compData) 1512 return hdr, payload, data, nil 1513} 1514