1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package grpc 20 21import ( 22 "context" 23 "errors" 24 "io" 25 "math" 26 "strconv" 27 "sync" 28 "time" 29 30 "golang.org/x/net/trace" 31 "google.golang.org/grpc/balancer" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/encoding" 34 "google.golang.org/grpc/grpclog" 35 "google.golang.org/grpc/internal/balancerload" 36 "google.golang.org/grpc/internal/binarylog" 37 "google.golang.org/grpc/internal/channelz" 38 "google.golang.org/grpc/internal/grpcrand" 39 "google.golang.org/grpc/internal/transport" 40 "google.golang.org/grpc/metadata" 41 "google.golang.org/grpc/peer" 42 "google.golang.org/grpc/stats" 43 "google.golang.org/grpc/status" 44) 45 46// StreamHandler defines the handler called by gRPC server to complete the 47// execution of a streaming RPC. If a StreamHandler returns an error, it 48// should be produced by the status package, or else gRPC will use 49// codes.Unknown as the status code and err.Error() as the status message 50// of the RPC. 51type StreamHandler func(srv interface{}, stream ServerStream) error 52 53// StreamDesc represents a streaming RPC service's method specification. 54type StreamDesc struct { 55 StreamName string 56 Handler StreamHandler 57 58 // At least one of these is true. 59 ServerStreams bool 60 ClientStreams bool 61} 62 63// Stream defines the common interface a client or server stream has to satisfy. 64// 65// Deprecated: See ClientStream and ServerStream documentation instead. 66type Stream interface { 67 // Deprecated: See ClientStream and ServerStream documentation instead. 68 Context() context.Context 69 // Deprecated: See ClientStream and ServerStream documentation instead. 70 SendMsg(m interface{}) error 71 // Deprecated: See ClientStream and ServerStream documentation instead. 72 RecvMsg(m interface{}) error 73} 74 75// ClientStream defines the client-side behavior of a streaming RPC. 76// 77// All errors returned from ClientStream methods are compatible with the 78// status package. 79type ClientStream interface { 80 // Header returns the header metadata received from the server if there 81 // is any. It blocks if the metadata is not ready to read. 82 Header() (metadata.MD, error) 83 // Trailer returns the trailer metadata from the server, if there is any. 84 // It must only be called after stream.CloseAndRecv has returned, or 85 // stream.Recv has returned a non-nil error (including io.EOF). 86 Trailer() metadata.MD 87 // CloseSend closes the send direction of the stream. It closes the stream 88 // when non-nil error is met. It is also not safe to call CloseSend 89 // concurrently with SendMsg. 90 CloseSend() error 91 // Context returns the context for this stream. 92 // 93 // It should not be called until after Header or RecvMsg has returned. Once 94 // called, subsequent client-side retries are disabled. 95 Context() context.Context 96 // SendMsg is generally called by generated code. On error, SendMsg aborts 97 // the stream. If the error was generated by the client, the status is 98 // returned directly; otherwise, io.EOF is returned and the status of 99 // the stream may be discovered using RecvMsg. 100 // 101 // SendMsg blocks until: 102 // - There is sufficient flow control to schedule m with the transport, or 103 // - The stream is done, or 104 // - The stream breaks. 105 // 106 // SendMsg does not wait until the message is received by the server. An 107 // untimely stream closure may result in lost messages. To ensure delivery, 108 // users should ensure the RPC completed successfully using RecvMsg. 109 // 110 // It is safe to have a goroutine calling SendMsg and another goroutine 111 // calling RecvMsg on the same stream at the same time, but it is not safe 112 // to call SendMsg on the same stream in different goroutines. It is also 113 // not safe to call CloseSend concurrently with SendMsg. 114 SendMsg(m interface{}) error 115 // RecvMsg blocks until it receives a message into m or the stream is 116 // done. It returns io.EOF when the stream completes successfully. On 117 // any other error, the stream is aborted and the error contains the RPC 118 // status. 119 // 120 // It is safe to have a goroutine calling SendMsg and another goroutine 121 // calling RecvMsg on the same stream at the same time, but it is not 122 // safe to call RecvMsg on the same stream in different goroutines. 123 RecvMsg(m interface{}) error 124} 125 126// NewStream creates a new Stream for the client side. This is typically 127// called by generated code. ctx is used for the lifetime of the stream. 128// 129// To ensure resources are not leaked due to the stream returned, one of the following 130// actions must be performed: 131// 132// 1. Call Close on the ClientConn. 133// 2. Cancel the context provided. 134// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated 135// client-streaming RPC, for instance, might use the helper function 136// CloseAndRecv (note that CloseSend does not Recv, therefore is not 137// guaranteed to release all resources). 138// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg. 139// 140// If none of the above happen, a goroutine and a context will be leaked, and grpc 141// will not call the optionally-configured stats handler with a stats.End message. 142func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { 143 // allow interceptor to see all applicable call options, which means those 144 // configured as defaults from dial option as well as per-call options 145 opts = combine(cc.dopts.callOptions, opts) 146 147 if cc.dopts.streamInt != nil { 148 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) 149 } 150 return newClientStream(ctx, desc, cc, method, opts...) 151} 152 153// NewClientStream is a wrapper for ClientConn.NewStream. 154func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { 155 return cc.NewStream(ctx, desc, method, opts...) 156} 157 158func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { 159 if channelz.IsOn() { 160 cc.incrCallsStarted() 161 defer func() { 162 if err != nil { 163 cc.incrCallsFailed() 164 } 165 }() 166 } 167 c := defaultCallInfo() 168 // Provide an opportunity for the first RPC to see the first service config 169 // provided by the resolver. 170 if err := cc.waitForResolvedAddrs(ctx); err != nil { 171 return nil, err 172 } 173 mc := cc.GetMethodConfig(method) 174 if mc.WaitForReady != nil { 175 c.failFast = !*mc.WaitForReady 176 } 177 178 // Possible context leak: 179 // The cancel function for the child context we create will only be called 180 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if 181 // an error is generated by SendMsg. 182 // https://github.com/grpc/grpc-go/issues/1818. 183 var cancel context.CancelFunc 184 if mc.Timeout != nil && *mc.Timeout >= 0 { 185 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) 186 } else { 187 ctx, cancel = context.WithCancel(ctx) 188 } 189 defer func() { 190 if err != nil { 191 cancel() 192 } 193 }() 194 195 for _, o := range opts { 196 if err := o.before(c); err != nil { 197 return nil, toRPCErr(err) 198 } 199 } 200 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) 201 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) 202 if err := setCallInfoCodec(c); err != nil { 203 return nil, err 204 } 205 206 callHdr := &transport.CallHdr{ 207 Host: cc.authority, 208 Method: method, 209 ContentSubtype: c.contentSubtype, 210 } 211 212 // Set our outgoing compression according to the UseCompressor CallOption, if 213 // set. In that case, also find the compressor from the encoding package. 214 // Otherwise, use the compressor configured by the WithCompressor DialOption, 215 // if set. 216 var cp Compressor 217 var comp encoding.Compressor 218 if ct := c.compressorType; ct != "" { 219 callHdr.SendCompress = ct 220 if ct != encoding.Identity { 221 comp = encoding.GetCompressor(ct) 222 if comp == nil { 223 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) 224 } 225 } 226 } else if cc.dopts.cp != nil { 227 callHdr.SendCompress = cc.dopts.cp.Type() 228 cp = cc.dopts.cp 229 } 230 if c.creds != nil { 231 callHdr.Creds = c.creds 232 } 233 var trInfo *traceInfo 234 if EnableTracing { 235 trInfo = &traceInfo{ 236 tr: trace.New("grpc.Sent."+methodFamily(method), method), 237 firstLine: firstLine{ 238 client: true, 239 }, 240 } 241 if deadline, ok := ctx.Deadline(); ok { 242 trInfo.firstLine.deadline = time.Until(deadline) 243 } 244 trInfo.tr.LazyLog(&trInfo.firstLine, false) 245 ctx = trace.NewContext(ctx, trInfo.tr) 246 } 247 ctx = newContextWithRPCInfo(ctx, c.failFast, c.codec, cp, comp) 248 sh := cc.dopts.copts.StatsHandler 249 var beginTime time.Time 250 if sh != nil { 251 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) 252 beginTime = time.Now() 253 begin := &stats.Begin{ 254 Client: true, 255 BeginTime: beginTime, 256 FailFast: c.failFast, 257 } 258 sh.HandleRPC(ctx, begin) 259 } 260 261 cs := &clientStream{ 262 callHdr: callHdr, 263 ctx: ctx, 264 methodConfig: &mc, 265 opts: opts, 266 callInfo: c, 267 cc: cc, 268 desc: desc, 269 codec: c.codec, 270 cp: cp, 271 comp: comp, 272 cancel: cancel, 273 beginTime: beginTime, 274 firstAttempt: true, 275 } 276 if !cc.dopts.disableRetry { 277 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler) 278 } 279 cs.binlog = binarylog.GetMethodLogger(method) 280 281 cs.callInfo.stream = cs 282 // Only this initial attempt has stats/tracing. 283 // TODO(dfawley): move to newAttempt when per-attempt stats are implemented. 284 if err := cs.newAttemptLocked(sh, trInfo); err != nil { 285 cs.finish(err) 286 return nil, err 287 } 288 289 op := func(a *csAttempt) error { return a.newStream() } 290 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil { 291 cs.finish(err) 292 return nil, err 293 } 294 295 if cs.binlog != nil { 296 md, _ := metadata.FromOutgoingContext(ctx) 297 logEntry := &binarylog.ClientHeader{ 298 OnClientSide: true, 299 Header: md, 300 MethodName: method, 301 Authority: cs.cc.authority, 302 } 303 if deadline, ok := ctx.Deadline(); ok { 304 logEntry.Timeout = time.Until(deadline) 305 if logEntry.Timeout < 0 { 306 logEntry.Timeout = 0 307 } 308 } 309 cs.binlog.Log(logEntry) 310 } 311 312 if desc != unaryStreamDesc { 313 // Listen on cc and stream contexts to cleanup when the user closes the 314 // ClientConn or cancels the stream context. In all other cases, an error 315 // should already be injected into the recv buffer by the transport, which 316 // the client will eventually receive, and then we will cancel the stream's 317 // context in clientStream.finish. 318 go func() { 319 select { 320 case <-cc.ctx.Done(): 321 cs.finish(ErrClientConnClosing) 322 case <-ctx.Done(): 323 cs.finish(toRPCErr(ctx.Err())) 324 } 325 }() 326 } 327 return cs, nil 328} 329 330// newAttemptLocked creates a new attempt with a transport. 331// If it succeeds, then it replaces clientStream's attempt with this new attempt. 332func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo *traceInfo) (retErr error) { 333 newAttempt := &csAttempt{ 334 cs: cs, 335 dc: cs.cc.dopts.dc, 336 statsHandler: sh, 337 trInfo: trInfo, 338 } 339 defer func() { 340 if retErr != nil { 341 // This attempt is not set in the clientStream, so it's finish won't 342 // be called. Call it here for stats and trace in case they are not 343 // nil. 344 newAttempt.finish(retErr) 345 } 346 }() 347 348 if err := cs.ctx.Err(); err != nil { 349 return toRPCErr(err) 350 } 351 t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method) 352 if err != nil { 353 return err 354 } 355 if trInfo != nil { 356 trInfo.firstLine.SetRemoteAddr(t.RemoteAddr()) 357 } 358 newAttempt.t = t 359 newAttempt.done = done 360 cs.attempt = newAttempt 361 return nil 362} 363 364func (a *csAttempt) newStream() error { 365 cs := a.cs 366 cs.callHdr.PreviousAttempts = cs.numRetries 367 s, err := a.t.NewStream(cs.ctx, cs.callHdr) 368 if err != nil { 369 return toRPCErr(err) 370 } 371 cs.attempt.s = s 372 cs.attempt.p = &parser{r: s} 373 return nil 374} 375 376// clientStream implements a client side Stream. 377type clientStream struct { 378 callHdr *transport.CallHdr 379 opts []CallOption 380 callInfo *callInfo 381 cc *ClientConn 382 desc *StreamDesc 383 384 codec baseCodec 385 cp Compressor 386 comp encoding.Compressor 387 388 cancel context.CancelFunc // cancels all attempts 389 390 sentLast bool // sent an end stream 391 beginTime time.Time 392 393 methodConfig *MethodConfig 394 395 ctx context.Context // the application's context, wrapped by stats/tracing 396 397 retryThrottler *retryThrottler // The throttler active when the RPC began. 398 399 binlog *binarylog.MethodLogger // Binary logger, can be nil. 400 // serverHeaderBinlogged is a boolean for whether server header has been 401 // logged. Server header will be logged when the first time one of those 402 // happens: stream.Header(), stream.Recv(). 403 // 404 // It's only read and used by Recv() and Header(), so it doesn't need to be 405 // synchronized. 406 serverHeaderBinlogged bool 407 408 mu sync.Mutex 409 firstAttempt bool // if true, transparent retry is valid 410 numRetries int // exclusive of transparent retry attempt(s) 411 numRetriesSincePushback int // retries since pushback; to reset backoff 412 finished bool // TODO: replace with atomic cmpxchg or sync.Once? 413 // attempt is the active client stream attempt. 414 // The only place where it is written is the newAttemptLocked method and this method never writes nil. 415 // So, attempt can be nil only inside newClientStream function when clientStream is first created. 416 // One of the first things done after clientStream's creation, is to call newAttemptLocked which either 417 // assigns a non nil value to the attempt or returns an error. If an error is returned from newAttemptLocked, 418 // then newClientStream calls finish on the clientStream and returns. So, finish method is the only 419 // place where we need to check if the attempt is nil. 420 attempt *csAttempt 421 // TODO(hedging): hedging will have multiple attempts simultaneously. 422 committed bool // active attempt committed for retry? 423 buffer []func(a *csAttempt) error // operations to replay on retry 424 bufferSize int // current size of buffer 425} 426 427// csAttempt implements a single transport stream attempt within a 428// clientStream. 429type csAttempt struct { 430 cs *clientStream 431 t transport.ClientTransport 432 s *transport.Stream 433 p *parser 434 done func(balancer.DoneInfo) 435 436 finished bool 437 dc Decompressor 438 decomp encoding.Compressor 439 decompSet bool 440 441 mu sync.Mutex // guards trInfo.tr 442 // trInfo may be nil (if EnableTracing is false). 443 // trInfo.tr is set when created (if EnableTracing is true), 444 // and cleared when the finish method is called. 445 trInfo *traceInfo 446 447 statsHandler stats.Handler 448} 449 450func (cs *clientStream) commitAttemptLocked() { 451 cs.committed = true 452 cs.buffer = nil 453} 454 455func (cs *clientStream) commitAttempt() { 456 cs.mu.Lock() 457 cs.commitAttemptLocked() 458 cs.mu.Unlock() 459} 460 461// shouldRetry returns nil if the RPC should be retried; otherwise it returns 462// the error that should be returned by the operation. 463func (cs *clientStream) shouldRetry(err error) error { 464 if cs.attempt.s == nil && !cs.callInfo.failFast { 465 // In the event of any error from NewStream (attempt.s == nil), we 466 // never attempted to write anything to the wire, so we can retry 467 // indefinitely for non-fail-fast RPCs. 468 return nil 469 } 470 if cs.finished || cs.committed { 471 // RPC is finished or committed; cannot retry. 472 return err 473 } 474 // Wait for the trailers. 475 if cs.attempt.s != nil { 476 <-cs.attempt.s.Done() 477 } 478 if cs.firstAttempt && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) { 479 // First attempt, stream unprocessed: transparently retry. 480 cs.firstAttempt = false 481 return nil 482 } 483 cs.firstAttempt = false 484 if cs.cc.dopts.disableRetry { 485 return err 486 } 487 488 pushback := 0 489 hasPushback := false 490 if cs.attempt.s != nil { 491 if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to { 492 return err 493 } 494 495 // TODO(retry): Move down if the spec changes to not check server pushback 496 // before considering this a failure for throttling. 497 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"] 498 if len(sps) == 1 { 499 var e error 500 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 { 501 grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0]) 502 cs.retryThrottler.throttle() // This counts as a failure for throttling. 503 return err 504 } 505 hasPushback = true 506 } else if len(sps) > 1 { 507 grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps) 508 cs.retryThrottler.throttle() // This counts as a failure for throttling. 509 return err 510 } 511 } 512 513 var code codes.Code 514 if cs.attempt.s != nil { 515 code = cs.attempt.s.Status().Code() 516 } else { 517 code = status.Convert(err).Code() 518 } 519 520 rp := cs.methodConfig.retryPolicy 521 if rp == nil || !rp.retryableStatusCodes[code] { 522 return err 523 } 524 525 // Note: the ordering here is important; we count this as a failure 526 // only if the code matched a retryable code. 527 if cs.retryThrottler.throttle() { 528 return err 529 } 530 if cs.numRetries+1 >= rp.maxAttempts { 531 return err 532 } 533 534 var dur time.Duration 535 if hasPushback { 536 dur = time.Millisecond * time.Duration(pushback) 537 cs.numRetriesSincePushback = 0 538 } else { 539 fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback)) 540 cur := float64(rp.initialBackoff) * fact 541 if max := float64(rp.maxBackoff); cur > max { 542 cur = max 543 } 544 dur = time.Duration(grpcrand.Int63n(int64(cur))) 545 cs.numRetriesSincePushback++ 546 } 547 548 // TODO(dfawley): we could eagerly fail here if dur puts us past the 549 // deadline, but unsure if it is worth doing. 550 t := time.NewTimer(dur) 551 select { 552 case <-t.C: 553 cs.numRetries++ 554 return nil 555 case <-cs.ctx.Done(): 556 t.Stop() 557 return status.FromContextError(cs.ctx.Err()).Err() 558 } 559} 560 561// Returns nil if a retry was performed and succeeded; error otherwise. 562func (cs *clientStream) retryLocked(lastErr error) error { 563 for { 564 cs.attempt.finish(lastErr) 565 if err := cs.shouldRetry(lastErr); err != nil { 566 cs.commitAttemptLocked() 567 return err 568 } 569 if err := cs.newAttemptLocked(nil, nil); err != nil { 570 return err 571 } 572 if lastErr = cs.replayBufferLocked(); lastErr == nil { 573 return nil 574 } 575 } 576} 577 578func (cs *clientStream) Context() context.Context { 579 cs.commitAttempt() 580 // No need to lock before using attempt, since we know it is committed and 581 // cannot change. 582 return cs.attempt.s.Context() 583} 584 585func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error { 586 cs.mu.Lock() 587 for { 588 if cs.committed { 589 cs.mu.Unlock() 590 return op(cs.attempt) 591 } 592 a := cs.attempt 593 cs.mu.Unlock() 594 err := op(a) 595 cs.mu.Lock() 596 if a != cs.attempt { 597 // We started another attempt already. 598 continue 599 } 600 if err == io.EOF { 601 <-a.s.Done() 602 } 603 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) { 604 onSuccess() 605 cs.mu.Unlock() 606 return err 607 } 608 if err := cs.retryLocked(err); err != nil { 609 cs.mu.Unlock() 610 return err 611 } 612 } 613} 614 615func (cs *clientStream) Header() (metadata.MD, error) { 616 var m metadata.MD 617 err := cs.withRetry(func(a *csAttempt) error { 618 var err error 619 m, err = a.s.Header() 620 return toRPCErr(err) 621 }, cs.commitAttemptLocked) 622 if err != nil { 623 cs.finish(err) 624 return nil, err 625 } 626 if cs.binlog != nil && !cs.serverHeaderBinlogged { 627 // Only log if binary log is on and header has not been logged. 628 logEntry := &binarylog.ServerHeader{ 629 OnClientSide: true, 630 Header: m, 631 PeerAddr: nil, 632 } 633 if peer, ok := peer.FromContext(cs.Context()); ok { 634 logEntry.PeerAddr = peer.Addr 635 } 636 cs.binlog.Log(logEntry) 637 cs.serverHeaderBinlogged = true 638 } 639 return m, err 640} 641 642func (cs *clientStream) Trailer() metadata.MD { 643 // On RPC failure, we never need to retry, because usage requires that 644 // RecvMsg() returned a non-nil error before calling this function is valid. 645 // We would have retried earlier if necessary. 646 // 647 // Commit the attempt anyway, just in case users are not following those 648 // directions -- it will prevent races and should not meaningfully impact 649 // performance. 650 cs.commitAttempt() 651 if cs.attempt.s == nil { 652 return nil 653 } 654 return cs.attempt.s.Trailer() 655} 656 657func (cs *clientStream) replayBufferLocked() error { 658 a := cs.attempt 659 for _, f := range cs.buffer { 660 if err := f(a); err != nil { 661 return err 662 } 663 } 664 return nil 665} 666 667func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) { 668 // Note: we still will buffer if retry is disabled (for transparent retries). 669 if cs.committed { 670 return 671 } 672 cs.bufferSize += sz 673 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize { 674 cs.commitAttemptLocked() 675 return 676 } 677 cs.buffer = append(cs.buffer, op) 678} 679 680func (cs *clientStream) SendMsg(m interface{}) (err error) { 681 defer func() { 682 if err != nil && err != io.EOF { 683 // Call finish on the client stream for errors generated by this SendMsg 684 // call, as these indicate problems created by this client. (Transport 685 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real 686 // error will be returned from RecvMsg eventually in that case, or be 687 // retried.) 688 cs.finish(err) 689 } 690 }() 691 if cs.sentLast { 692 return status.Errorf(codes.Internal, "SendMsg called after CloseSend") 693 } 694 if !cs.desc.ClientStreams { 695 cs.sentLast = true 696 } 697 698 // load hdr, payload, data 699 hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp) 700 if err != nil { 701 return err 702 } 703 704 // TODO(dfawley): should we be checking len(data) instead? 705 if len(payload) > *cs.callInfo.maxSendMessageSize { 706 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) 707 } 708 msgBytes := data // Store the pointer before setting to nil. For binary logging. 709 op := func(a *csAttempt) error { 710 err := a.sendMsg(m, hdr, payload, data) 711 // nil out the message and uncomp when replaying; they are only needed for 712 // stats which is disabled for subsequent attempts. 713 m, data = nil, nil 714 return err 715 } 716 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) }) 717 if cs.binlog != nil && err == nil { 718 cs.binlog.Log(&binarylog.ClientMessage{ 719 OnClientSide: true, 720 Message: msgBytes, 721 }) 722 } 723 return 724} 725 726func (cs *clientStream) RecvMsg(m interface{}) error { 727 if cs.binlog != nil && !cs.serverHeaderBinlogged { 728 // Call Header() to binary log header if it's not already logged. 729 cs.Header() 730 } 731 var recvInfo *payloadInfo 732 if cs.binlog != nil { 733 recvInfo = &payloadInfo{} 734 } 735 err := cs.withRetry(func(a *csAttempt) error { 736 return a.recvMsg(m, recvInfo) 737 }, cs.commitAttemptLocked) 738 if cs.binlog != nil && err == nil { 739 cs.binlog.Log(&binarylog.ServerMessage{ 740 OnClientSide: true, 741 Message: recvInfo.uncompressedBytes, 742 }) 743 } 744 if err != nil || !cs.desc.ServerStreams { 745 // err != nil or non-server-streaming indicates end of stream. 746 cs.finish(err) 747 748 if cs.binlog != nil { 749 // finish will not log Trailer. Log Trailer here. 750 logEntry := &binarylog.ServerTrailer{ 751 OnClientSide: true, 752 Trailer: cs.Trailer(), 753 Err: err, 754 } 755 if logEntry.Err == io.EOF { 756 logEntry.Err = nil 757 } 758 if peer, ok := peer.FromContext(cs.Context()); ok { 759 logEntry.PeerAddr = peer.Addr 760 } 761 cs.binlog.Log(logEntry) 762 } 763 } 764 return err 765} 766 767func (cs *clientStream) CloseSend() error { 768 if cs.sentLast { 769 // TODO: return an error and finish the stream instead, due to API misuse? 770 return nil 771 } 772 cs.sentLast = true 773 op := func(a *csAttempt) error { 774 a.t.Write(a.s, nil, nil, &transport.Options{Last: true}) 775 // Always return nil; io.EOF is the only error that might make sense 776 // instead, but there is no need to signal the client to call RecvMsg 777 // as the only use left for the stream after CloseSend is to call 778 // RecvMsg. This also matches historical behavior. 779 return nil 780 } 781 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }) 782 if cs.binlog != nil { 783 cs.binlog.Log(&binarylog.ClientHalfClose{ 784 OnClientSide: true, 785 }) 786 } 787 // We never returned an error here for reasons. 788 return nil 789} 790 791func (cs *clientStream) finish(err error) { 792 if err == io.EOF { 793 // Ending a stream with EOF indicates a success. 794 err = nil 795 } 796 cs.mu.Lock() 797 if cs.finished { 798 cs.mu.Unlock() 799 return 800 } 801 cs.finished = true 802 cs.commitAttemptLocked() 803 cs.mu.Unlock() 804 // For binary logging. only log cancel in finish (could be caused by RPC ctx 805 // canceled or ClientConn closed). Trailer will be logged in RecvMsg. 806 // 807 // Only one of cancel or trailer needs to be logged. In the cases where 808 // users don't call RecvMsg, users must have already canceled the RPC. 809 if cs.binlog != nil && status.Code(err) == codes.Canceled { 810 cs.binlog.Log(&binarylog.Cancel{ 811 OnClientSide: true, 812 }) 813 } 814 if err == nil { 815 cs.retryThrottler.successfulRPC() 816 } 817 if channelz.IsOn() { 818 if err != nil { 819 cs.cc.incrCallsFailed() 820 } else { 821 cs.cc.incrCallsSucceeded() 822 } 823 } 824 if cs.attempt != nil { 825 cs.attempt.finish(err) 826 // after functions all rely upon having a stream. 827 if cs.attempt.s != nil { 828 for _, o := range cs.opts { 829 o.after(cs.callInfo) 830 } 831 } 832 } 833 cs.cancel() 834} 835 836func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { 837 cs := a.cs 838 if a.trInfo != nil { 839 a.mu.Lock() 840 if a.trInfo.tr != nil { 841 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) 842 } 843 a.mu.Unlock() 844 } 845 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { 846 if !cs.desc.ClientStreams { 847 // For non-client-streaming RPCs, we return nil instead of EOF on error 848 // because the generated code requires it. finish is not called; RecvMsg() 849 // will call it with the stream's status independently. 850 return nil 851 } 852 return io.EOF 853 } 854 if a.statsHandler != nil { 855 a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now())) 856 } 857 if channelz.IsOn() { 858 a.t.IncrMsgSent() 859 } 860 return nil 861} 862 863func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { 864 cs := a.cs 865 if a.statsHandler != nil && payInfo == nil { 866 payInfo = &payloadInfo{} 867 } 868 869 if !a.decompSet { 870 // Block until we receive headers containing received message encoding. 871 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity { 872 if a.dc == nil || a.dc.Type() != ct { 873 // No configured decompressor, or it does not match the incoming 874 // message encoding; attempt to find a registered compressor that does. 875 a.dc = nil 876 a.decomp = encoding.GetCompressor(ct) 877 } 878 } else { 879 // No compression is used; disable our decompressor. 880 a.dc = nil 881 } 882 // Only initialize this state once per stream. 883 a.decompSet = true 884 } 885 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp) 886 if err != nil { 887 if err == io.EOF { 888 if statusErr := a.s.Status().Err(); statusErr != nil { 889 return statusErr 890 } 891 return io.EOF // indicates successful end of stream. 892 } 893 return toRPCErr(err) 894 } 895 if a.trInfo != nil { 896 a.mu.Lock() 897 if a.trInfo.tr != nil { 898 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) 899 } 900 a.mu.Unlock() 901 } 902 if a.statsHandler != nil { 903 a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{ 904 Client: true, 905 RecvTime: time.Now(), 906 Payload: m, 907 // TODO truncate large payload. 908 Data: payInfo.uncompressedBytes, 909 WireLength: payInfo.wireLength, 910 Length: len(payInfo.uncompressedBytes), 911 }) 912 } 913 if channelz.IsOn() { 914 a.t.IncrMsgRecv() 915 } 916 if cs.desc.ServerStreams { 917 // Subsequent messages should be received by subsequent RecvMsg calls. 918 return nil 919 } 920 // Special handling for non-server-stream rpcs. 921 // This recv expects EOF or errors, so we don't collect inPayload. 922 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp) 923 if err == nil { 924 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) 925 } 926 if err == io.EOF { 927 return a.s.Status().Err() // non-server streaming Recv returns nil on success 928 } 929 return toRPCErr(err) 930} 931 932func (a *csAttempt) finish(err error) { 933 a.mu.Lock() 934 if a.finished { 935 a.mu.Unlock() 936 return 937 } 938 a.finished = true 939 if err == io.EOF { 940 // Ending a stream with EOF indicates a success. 941 err = nil 942 } 943 var tr metadata.MD 944 if a.s != nil { 945 a.t.CloseStream(a.s, err) 946 tr = a.s.Trailer() 947 } 948 949 if a.done != nil { 950 br := false 951 if a.s != nil { 952 br = a.s.BytesReceived() 953 } 954 a.done(balancer.DoneInfo{ 955 Err: err, 956 Trailer: tr, 957 BytesSent: a.s != nil, 958 BytesReceived: br, 959 ServerLoad: balancerload.Parse(tr), 960 }) 961 } 962 if a.statsHandler != nil { 963 end := &stats.End{ 964 Client: true, 965 BeginTime: a.cs.beginTime, 966 EndTime: time.Now(), 967 Trailer: tr, 968 Error: err, 969 } 970 a.statsHandler.HandleRPC(a.cs.ctx, end) 971 } 972 if a.trInfo != nil && a.trInfo.tr != nil { 973 if err == nil { 974 a.trInfo.tr.LazyPrintf("RPC: [OK]") 975 } else { 976 a.trInfo.tr.LazyPrintf("RPC: [%v]", err) 977 a.trInfo.tr.SetError() 978 } 979 a.trInfo.tr.Finish() 980 a.trInfo.tr = nil 981 } 982 a.mu.Unlock() 983} 984 985// newClientStream creates a ClientStream with the specified transport, on the 986// given addrConn. 987// 988// It's expected that the given transport is either the same one in addrConn, or 989// is already closed. To avoid race, transport is specified separately, instead 990// of using ac.transpot. 991// 992// Main difference between this and ClientConn.NewStream: 993// - no retry 994// - no service config (or wait for service config) 995// - no tracing or stats 996func newNonRetryClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, ac *addrConn, opts ...CallOption) (_ ClientStream, err error) { 997 if t == nil { 998 // TODO: return RPC error here? 999 return nil, errors.New("transport provided is nil") 1000 } 1001 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct. 1002 c := &callInfo{} 1003 1004 // Possible context leak: 1005 // The cancel function for the child context we create will only be called 1006 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if 1007 // an error is generated by SendMsg. 1008 // https://github.com/grpc/grpc-go/issues/1818. 1009 ctx, cancel := context.WithCancel(ctx) 1010 defer func() { 1011 if err != nil { 1012 cancel() 1013 } 1014 }() 1015 1016 for _, o := range opts { 1017 if err := o.before(c); err != nil { 1018 return nil, toRPCErr(err) 1019 } 1020 } 1021 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) 1022 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize) 1023 if err := setCallInfoCodec(c); err != nil { 1024 return nil, err 1025 } 1026 1027 callHdr := &transport.CallHdr{ 1028 Host: ac.cc.authority, 1029 Method: method, 1030 ContentSubtype: c.contentSubtype, 1031 } 1032 1033 // Set our outgoing compression according to the UseCompressor CallOption, if 1034 // set. In that case, also find the compressor from the encoding package. 1035 // Otherwise, use the compressor configured by the WithCompressor DialOption, 1036 // if set. 1037 var cp Compressor 1038 var comp encoding.Compressor 1039 if ct := c.compressorType; ct != "" { 1040 callHdr.SendCompress = ct 1041 if ct != encoding.Identity { 1042 comp = encoding.GetCompressor(ct) 1043 if comp == nil { 1044 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) 1045 } 1046 } 1047 } else if ac.cc.dopts.cp != nil { 1048 callHdr.SendCompress = ac.cc.dopts.cp.Type() 1049 cp = ac.cc.dopts.cp 1050 } 1051 if c.creds != nil { 1052 callHdr.Creds = c.creds 1053 } 1054 1055 // Use a special addrConnStream to avoid retry. 1056 as := &addrConnStream{ 1057 callHdr: callHdr, 1058 ac: ac, 1059 ctx: ctx, 1060 cancel: cancel, 1061 opts: opts, 1062 callInfo: c, 1063 desc: desc, 1064 codec: c.codec, 1065 cp: cp, 1066 comp: comp, 1067 t: t, 1068 } 1069 1070 as.callInfo.stream = as 1071 s, err := as.t.NewStream(as.ctx, as.callHdr) 1072 if err != nil { 1073 err = toRPCErr(err) 1074 return nil, err 1075 } 1076 as.s = s 1077 as.p = &parser{r: s} 1078 ac.incrCallsStarted() 1079 if desc != unaryStreamDesc { 1080 // Listen on cc and stream contexts to cleanup when the user closes the 1081 // ClientConn or cancels the stream context. In all other cases, an error 1082 // should already be injected into the recv buffer by the transport, which 1083 // the client will eventually receive, and then we will cancel the stream's 1084 // context in clientStream.finish. 1085 go func() { 1086 select { 1087 case <-ac.ctx.Done(): 1088 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing")) 1089 case <-ctx.Done(): 1090 as.finish(toRPCErr(ctx.Err())) 1091 } 1092 }() 1093 } 1094 return as, nil 1095} 1096 1097type addrConnStream struct { 1098 s *transport.Stream 1099 ac *addrConn 1100 callHdr *transport.CallHdr 1101 cancel context.CancelFunc 1102 opts []CallOption 1103 callInfo *callInfo 1104 t transport.ClientTransport 1105 ctx context.Context 1106 sentLast bool 1107 desc *StreamDesc 1108 codec baseCodec 1109 cp Compressor 1110 comp encoding.Compressor 1111 decompSet bool 1112 dc Decompressor 1113 decomp encoding.Compressor 1114 p *parser 1115 mu sync.Mutex 1116 finished bool 1117} 1118 1119func (as *addrConnStream) Header() (metadata.MD, error) { 1120 m, err := as.s.Header() 1121 if err != nil { 1122 as.finish(toRPCErr(err)) 1123 } 1124 return m, err 1125} 1126 1127func (as *addrConnStream) Trailer() metadata.MD { 1128 return as.s.Trailer() 1129} 1130 1131func (as *addrConnStream) CloseSend() error { 1132 if as.sentLast { 1133 // TODO: return an error and finish the stream instead, due to API misuse? 1134 return nil 1135 } 1136 as.sentLast = true 1137 1138 as.t.Write(as.s, nil, nil, &transport.Options{Last: true}) 1139 // Always return nil; io.EOF is the only error that might make sense 1140 // instead, but there is no need to signal the client to call RecvMsg 1141 // as the only use left for the stream after CloseSend is to call 1142 // RecvMsg. This also matches historical behavior. 1143 return nil 1144} 1145 1146func (as *addrConnStream) Context() context.Context { 1147 return as.s.Context() 1148} 1149 1150func (as *addrConnStream) SendMsg(m interface{}) (err error) { 1151 defer func() { 1152 if err != nil && err != io.EOF { 1153 // Call finish on the client stream for errors generated by this SendMsg 1154 // call, as these indicate problems created by this client. (Transport 1155 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real 1156 // error will be returned from RecvMsg eventually in that case, or be 1157 // retried.) 1158 as.finish(err) 1159 } 1160 }() 1161 if as.sentLast { 1162 return status.Errorf(codes.Internal, "SendMsg called after CloseSend") 1163 } 1164 if !as.desc.ClientStreams { 1165 as.sentLast = true 1166 } 1167 1168 // load hdr, payload, data 1169 hdr, payld, _, err := prepareMsg(m, as.codec, as.cp, as.comp) 1170 if err != nil { 1171 return err 1172 } 1173 1174 // TODO(dfawley): should we be checking len(data) instead? 1175 if len(payld) > *as.callInfo.maxSendMessageSize { 1176 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize) 1177 } 1178 1179 if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil { 1180 if !as.desc.ClientStreams { 1181 // For non-client-streaming RPCs, we return nil instead of EOF on error 1182 // because the generated code requires it. finish is not called; RecvMsg() 1183 // will call it with the stream's status independently. 1184 return nil 1185 } 1186 return io.EOF 1187 } 1188 1189 if channelz.IsOn() { 1190 as.t.IncrMsgSent() 1191 } 1192 return nil 1193} 1194 1195func (as *addrConnStream) RecvMsg(m interface{}) (err error) { 1196 defer func() { 1197 if err != nil || !as.desc.ServerStreams { 1198 // err != nil or non-server-streaming indicates end of stream. 1199 as.finish(err) 1200 } 1201 }() 1202 1203 if !as.decompSet { 1204 // Block until we receive headers containing received message encoding. 1205 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity { 1206 if as.dc == nil || as.dc.Type() != ct { 1207 // No configured decompressor, or it does not match the incoming 1208 // message encoding; attempt to find a registered compressor that does. 1209 as.dc = nil 1210 as.decomp = encoding.GetCompressor(ct) 1211 } 1212 } else { 1213 // No compression is used; disable our decompressor. 1214 as.dc = nil 1215 } 1216 // Only initialize this state once per stream. 1217 as.decompSet = true 1218 } 1219 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) 1220 if err != nil { 1221 if err == io.EOF { 1222 if statusErr := as.s.Status().Err(); statusErr != nil { 1223 return statusErr 1224 } 1225 return io.EOF // indicates successful end of stream. 1226 } 1227 return toRPCErr(err) 1228 } 1229 1230 if channelz.IsOn() { 1231 as.t.IncrMsgRecv() 1232 } 1233 if as.desc.ServerStreams { 1234 // Subsequent messages should be received by subsequent RecvMsg calls. 1235 return nil 1236 } 1237 1238 // Special handling for non-server-stream rpcs. 1239 // This recv expects EOF or errors, so we don't collect inPayload. 1240 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) 1241 if err == nil { 1242 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) 1243 } 1244 if err == io.EOF { 1245 return as.s.Status().Err() // non-server streaming Recv returns nil on success 1246 } 1247 return toRPCErr(err) 1248} 1249 1250func (as *addrConnStream) finish(err error) { 1251 as.mu.Lock() 1252 if as.finished { 1253 as.mu.Unlock() 1254 return 1255 } 1256 as.finished = true 1257 if err == io.EOF { 1258 // Ending a stream with EOF indicates a success. 1259 err = nil 1260 } 1261 if as.s != nil { 1262 as.t.CloseStream(as.s, err) 1263 } 1264 1265 if err != nil { 1266 as.ac.incrCallsFailed() 1267 } else { 1268 as.ac.incrCallsSucceeded() 1269 } 1270 as.cancel() 1271 as.mu.Unlock() 1272} 1273 1274// ServerStream defines the server-side behavior of a streaming RPC. 1275// 1276// All errors returned from ServerStream methods are compatible with the 1277// status package. 1278type ServerStream interface { 1279 // SetHeader sets the header metadata. It may be called multiple times. 1280 // When call multiple times, all the provided metadata will be merged. 1281 // All the metadata will be sent out when one of the following happens: 1282 // - ServerStream.SendHeader() is called; 1283 // - The first response is sent out; 1284 // - An RPC status is sent out (error or success). 1285 SetHeader(metadata.MD) error 1286 // SendHeader sends the header metadata. 1287 // The provided md and headers set by SetHeader() will be sent. 1288 // It fails if called multiple times. 1289 SendHeader(metadata.MD) error 1290 // SetTrailer sets the trailer metadata which will be sent with the RPC status. 1291 // When called more than once, all the provided metadata will be merged. 1292 SetTrailer(metadata.MD) 1293 // Context returns the context for this stream. 1294 Context() context.Context 1295 // SendMsg sends a message. On error, SendMsg aborts the stream and the 1296 // error is returned directly. 1297 // 1298 // SendMsg blocks until: 1299 // - There is sufficient flow control to schedule m with the transport, or 1300 // - The stream is done, or 1301 // - The stream breaks. 1302 // 1303 // SendMsg does not wait until the message is received by the client. An 1304 // untimely stream closure may result in lost messages. 1305 // 1306 // It is safe to have a goroutine calling SendMsg and another goroutine 1307 // calling RecvMsg on the same stream at the same time, but it is not safe 1308 // to call SendMsg on the same stream in different goroutines. 1309 SendMsg(m interface{}) error 1310 // RecvMsg blocks until it receives a message into m or the stream is 1311 // done. It returns io.EOF when the client has performed a CloseSend. On 1312 // any non-EOF error, the stream is aborted and the error contains the 1313 // RPC status. 1314 // 1315 // It is safe to have a goroutine calling SendMsg and another goroutine 1316 // calling RecvMsg on the same stream at the same time, but it is not 1317 // safe to call RecvMsg on the same stream in different goroutines. 1318 RecvMsg(m interface{}) error 1319} 1320 1321// serverStream implements a server side Stream. 1322type serverStream struct { 1323 ctx context.Context 1324 t transport.ServerTransport 1325 s *transport.Stream 1326 p *parser 1327 codec baseCodec 1328 1329 cp Compressor 1330 dc Decompressor 1331 comp encoding.Compressor 1332 decomp encoding.Compressor 1333 1334 maxReceiveMessageSize int 1335 maxSendMessageSize int 1336 trInfo *traceInfo 1337 1338 statsHandler stats.Handler 1339 1340 binlog *binarylog.MethodLogger 1341 // serverHeaderBinlogged indicates whether server header has been logged. It 1342 // will happen when one of the following two happens: stream.SendHeader(), 1343 // stream.Send(). 1344 // 1345 // It's only checked in send and sendHeader, doesn't need to be 1346 // synchronized. 1347 serverHeaderBinlogged bool 1348 1349 mu sync.Mutex // protects trInfo.tr after the service handler runs. 1350} 1351 1352func (ss *serverStream) Context() context.Context { 1353 return ss.ctx 1354} 1355 1356func (ss *serverStream) SetHeader(md metadata.MD) error { 1357 if md.Len() == 0 { 1358 return nil 1359 } 1360 return ss.s.SetHeader(md) 1361} 1362 1363func (ss *serverStream) SendHeader(md metadata.MD) error { 1364 err := ss.t.WriteHeader(ss.s, md) 1365 if ss.binlog != nil && !ss.serverHeaderBinlogged { 1366 h, _ := ss.s.Header() 1367 ss.binlog.Log(&binarylog.ServerHeader{ 1368 Header: h, 1369 }) 1370 ss.serverHeaderBinlogged = true 1371 } 1372 return err 1373} 1374 1375func (ss *serverStream) SetTrailer(md metadata.MD) { 1376 if md.Len() == 0 { 1377 return 1378 } 1379 ss.s.SetTrailer(md) 1380} 1381 1382func (ss *serverStream) SendMsg(m interface{}) (err error) { 1383 defer func() { 1384 if ss.trInfo != nil { 1385 ss.mu.Lock() 1386 if ss.trInfo.tr != nil { 1387 if err == nil { 1388 ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) 1389 } else { 1390 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1391 ss.trInfo.tr.SetError() 1392 } 1393 } 1394 ss.mu.Unlock() 1395 } 1396 if err != nil && err != io.EOF { 1397 st, _ := status.FromError(toRPCErr(err)) 1398 ss.t.WriteStatus(ss.s, st) 1399 // Non-user specified status was sent out. This should be an error 1400 // case (as a server side Cancel maybe). 1401 // 1402 // This is not handled specifically now. User will return a final 1403 // status from the service handler, we will log that error instead. 1404 // This behavior is similar to an interceptor. 1405 } 1406 if channelz.IsOn() && err == nil { 1407 ss.t.IncrMsgSent() 1408 } 1409 }() 1410 1411 // load hdr, payload, data 1412 hdr, payload, data, err := prepareMsg(m, ss.codec, ss.cp, ss.comp) 1413 if err != nil { 1414 return err 1415 } 1416 1417 // TODO(dfawley): should we be checking len(data) instead? 1418 if len(payload) > ss.maxSendMessageSize { 1419 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) 1420 } 1421 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { 1422 return toRPCErr(err) 1423 } 1424 if ss.binlog != nil { 1425 if !ss.serverHeaderBinlogged { 1426 h, _ := ss.s.Header() 1427 ss.binlog.Log(&binarylog.ServerHeader{ 1428 Header: h, 1429 }) 1430 ss.serverHeaderBinlogged = true 1431 } 1432 ss.binlog.Log(&binarylog.ServerMessage{ 1433 Message: data, 1434 }) 1435 } 1436 if ss.statsHandler != nil { 1437 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) 1438 } 1439 return nil 1440} 1441 1442func (ss *serverStream) RecvMsg(m interface{}) (err error) { 1443 defer func() { 1444 if ss.trInfo != nil { 1445 ss.mu.Lock() 1446 if ss.trInfo.tr != nil { 1447 if err == nil { 1448 ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) 1449 } else if err != io.EOF { 1450 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1451 ss.trInfo.tr.SetError() 1452 } 1453 } 1454 ss.mu.Unlock() 1455 } 1456 if err != nil && err != io.EOF { 1457 st, _ := status.FromError(toRPCErr(err)) 1458 ss.t.WriteStatus(ss.s, st) 1459 // Non-user specified status was sent out. This should be an error 1460 // case (as a server side Cancel maybe). 1461 // 1462 // This is not handled specifically now. User will return a final 1463 // status from the service handler, we will log that error instead. 1464 // This behavior is similar to an interceptor. 1465 } 1466 if channelz.IsOn() && err == nil { 1467 ss.t.IncrMsgRecv() 1468 } 1469 }() 1470 var payInfo *payloadInfo 1471 if ss.statsHandler != nil || ss.binlog != nil { 1472 payInfo = &payloadInfo{} 1473 } 1474 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil { 1475 if err == io.EOF { 1476 if ss.binlog != nil { 1477 ss.binlog.Log(&binarylog.ClientHalfClose{}) 1478 } 1479 return err 1480 } 1481 if err == io.ErrUnexpectedEOF { 1482 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) 1483 } 1484 return toRPCErr(err) 1485 } 1486 if ss.statsHandler != nil { 1487 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{ 1488 RecvTime: time.Now(), 1489 Payload: m, 1490 // TODO truncate large payload. 1491 Data: payInfo.uncompressedBytes, 1492 WireLength: payInfo.wireLength, 1493 Length: len(payInfo.uncompressedBytes), 1494 }) 1495 } 1496 if ss.binlog != nil { 1497 ss.binlog.Log(&binarylog.ClientMessage{ 1498 Message: payInfo.uncompressedBytes, 1499 }) 1500 } 1501 return nil 1502} 1503 1504// MethodFromServerStream returns the method string for the input stream. 1505// The returned string is in the format of "/service/method". 1506func MethodFromServerStream(stream ServerStream) (string, bool) { 1507 return Method(stream.Context()) 1508} 1509 1510// prepareMsg returns the hdr, payload and data 1511// using the compressors passed or using the 1512// passed preparedmsg 1513func prepareMsg(m interface{}, codec baseCodec, cp Compressor, comp encoding.Compressor) (hdr, payload, data []byte, err error) { 1514 if preparedMsg, ok := m.(*PreparedMsg); ok { 1515 return preparedMsg.hdr, preparedMsg.payload, preparedMsg.encodedData, nil 1516 } 1517 // The input interface is not a prepared msg. 1518 // Marshal and Compress the data at this point 1519 data, err = encode(codec, m) 1520 if err != nil { 1521 return nil, nil, nil, err 1522 } 1523 compData, err := compress(data, cp, comp) 1524 if err != nil { 1525 return nil, nil, nil, err 1526 } 1527 hdr, payload = msgHeader(data, compData) 1528 return hdr, payload, data, nil 1529} 1530