1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "bytes" 23 "errors" 24 "fmt" 25 "runtime" 26 "strconv" 27 "sync" 28 "sync/atomic" 29 30 "golang.org/x/net/http2" 31 "golang.org/x/net/http2/hpack" 32 "google.golang.org/grpc/internal/grpcutil" 33 "google.golang.org/grpc/status" 34) 35 36var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) { 37 e.SetMaxDynamicTableSizeLimit(v) 38} 39 40type itemNode struct { 41 it interface{} 42 next *itemNode 43} 44 45type itemList struct { 46 head *itemNode 47 tail *itemNode 48} 49 50func (il *itemList) enqueue(i interface{}) { 51 n := &itemNode{it: i} 52 if il.tail == nil { 53 il.head, il.tail = n, n 54 return 55 } 56 il.tail.next = n 57 il.tail = n 58} 59 60// peek returns the first item in the list without removing it from the 61// list. 62func (il *itemList) peek() interface{} { 63 return il.head.it 64} 65 66func (il *itemList) dequeue() interface{} { 67 if il.head == nil { 68 return nil 69 } 70 i := il.head.it 71 il.head = il.head.next 72 if il.head == nil { 73 il.tail = nil 74 } 75 return i 76} 77 78func (il *itemList) dequeueAll() *itemNode { 79 h := il.head 80 il.head, il.tail = nil, nil 81 return h 82} 83 84func (il *itemList) isEmpty() bool { 85 return il.head == nil 86} 87 88// The following defines various control items which could flow through 89// the control buffer of transport. They represent different aspects of 90// control tasks, e.g., flow control, settings, streaming resetting, etc. 91 92// maxQueuedTransportResponseFrames is the most queued "transport response" 93// frames we will buffer before preventing new reads from occurring on the 94// transport. These are control frames sent in response to client requests, 95// such as RST_STREAM due to bad headers or settings acks. 96const maxQueuedTransportResponseFrames = 50 97 98type cbItem interface { 99 isTransportResponseFrame() bool 100} 101 102// registerStream is used to register an incoming stream with loopy writer. 103type registerStream struct { 104 streamID uint32 105 wq *writeQuota 106} 107 108func (*registerStream) isTransportResponseFrame() bool { return false } 109 110// headerFrame is also used to register stream on the client-side. 111type headerFrame struct { 112 streamID uint32 113 hf []hpack.HeaderField 114 endStream bool // Valid on server side. 115 initStream func(uint32) error // Used only on the client side. 116 onWrite func() 117 wq *writeQuota // write quota for the stream created. 118 cleanup *cleanupStream // Valid on the server side. 119 onOrphaned func(error) // Valid on client-side 120} 121 122func (h *headerFrame) isTransportResponseFrame() bool { 123 return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM 124} 125 126type cleanupStream struct { 127 streamID uint32 128 rst bool 129 rstCode http2.ErrCode 130 onWrite func() 131} 132 133func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM 134 135type earlyAbortStream struct { 136 streamID uint32 137 contentSubtype string 138 status *status.Status 139} 140 141func (*earlyAbortStream) isTransportResponseFrame() bool { return false } 142 143type dataFrame struct { 144 streamID uint32 145 endStream bool 146 h []byte 147 d []byte 148 // onEachWrite is called every time 149 // a part of d is written out. 150 onEachWrite func() 151} 152 153func (*dataFrame) isTransportResponseFrame() bool { return false } 154 155type incomingWindowUpdate struct { 156 streamID uint32 157 increment uint32 158} 159 160func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false } 161 162type outgoingWindowUpdate struct { 163 streamID uint32 164 increment uint32 165} 166 167func (*outgoingWindowUpdate) isTransportResponseFrame() bool { 168 return false // window updates are throttled by thresholds 169} 170 171type incomingSettings struct { 172 ss []http2.Setting 173} 174 175func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK 176 177type outgoingSettings struct { 178 ss []http2.Setting 179} 180 181func (*outgoingSettings) isTransportResponseFrame() bool { return false } 182 183type incomingGoAway struct { 184} 185 186func (*incomingGoAway) isTransportResponseFrame() bool { return false } 187 188type goAway struct { 189 code http2.ErrCode 190 debugData []byte 191 headsUp bool 192 closeConn bool 193} 194 195func (*goAway) isTransportResponseFrame() bool { return false } 196 197type ping struct { 198 ack bool 199 data [8]byte 200} 201 202func (*ping) isTransportResponseFrame() bool { return true } 203 204type outFlowControlSizeRequest struct { 205 resp chan uint32 206} 207 208func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false } 209 210type outStreamState int 211 212const ( 213 active outStreamState = iota 214 empty 215 waitingOnStreamQuota 216) 217 218type outStream struct { 219 id uint32 220 state outStreamState 221 itl *itemList 222 bytesOutStanding int 223 wq *writeQuota 224 225 next *outStream 226 prev *outStream 227} 228 229func (s *outStream) deleteSelf() { 230 if s.prev != nil { 231 s.prev.next = s.next 232 } 233 if s.next != nil { 234 s.next.prev = s.prev 235 } 236 s.next, s.prev = nil, nil 237} 238 239type outStreamList struct { 240 // Following are sentinel objects that mark the 241 // beginning and end of the list. They do not 242 // contain any item lists. All valid objects are 243 // inserted in between them. 244 // This is needed so that an outStream object can 245 // deleteSelf() in O(1) time without knowing which 246 // list it belongs to. 247 head *outStream 248 tail *outStream 249} 250 251func newOutStreamList() *outStreamList { 252 head, tail := new(outStream), new(outStream) 253 head.next = tail 254 tail.prev = head 255 return &outStreamList{ 256 head: head, 257 tail: tail, 258 } 259} 260 261func (l *outStreamList) enqueue(s *outStream) { 262 e := l.tail.prev 263 e.next = s 264 s.prev = e 265 s.next = l.tail 266 l.tail.prev = s 267} 268 269// remove from the beginning of the list. 270func (l *outStreamList) dequeue() *outStream { 271 b := l.head.next 272 if b == l.tail { 273 return nil 274 } 275 b.deleteSelf() 276 return b 277} 278 279// controlBuffer is a way to pass information to loopy. 280// Information is passed as specific struct types called control frames. 281// A control frame not only represents data, messages or headers to be sent out 282// but can also be used to instruct loopy to update its internal state. 283// It shouldn't be confused with an HTTP2 frame, although some of the control frames 284// like dataFrame and headerFrame do go out on wire as HTTP2 frames. 285type controlBuffer struct { 286 ch chan struct{} 287 done <-chan struct{} 288 mu sync.Mutex 289 consumerWaiting bool 290 list *itemList 291 err error 292 293 // transportResponseFrames counts the number of queued items that represent 294 // the response of an action initiated by the peer. trfChan is created 295 // when transportResponseFrames >= maxQueuedTransportResponseFrames and is 296 // closed and nilled when transportResponseFrames drops below the 297 // threshold. Both fields are protected by mu. 298 transportResponseFrames int 299 trfChan atomic.Value // chan struct{} 300} 301 302func newControlBuffer(done <-chan struct{}) *controlBuffer { 303 return &controlBuffer{ 304 ch: make(chan struct{}, 1), 305 list: &itemList{}, 306 done: done, 307 } 308} 309 310// throttle blocks if there are too many incomingSettings/cleanupStreams in the 311// controlbuf. 312func (c *controlBuffer) throttle() { 313 ch, _ := c.trfChan.Load().(chan struct{}) 314 if ch != nil { 315 select { 316 case <-ch: 317 case <-c.done: 318 } 319 } 320} 321 322func (c *controlBuffer) put(it cbItem) error { 323 _, err := c.executeAndPut(nil, it) 324 return err 325} 326 327func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) { 328 var wakeUp bool 329 c.mu.Lock() 330 if c.err != nil { 331 c.mu.Unlock() 332 return false, c.err 333 } 334 if f != nil { 335 if !f(it) { // f wasn't successful 336 c.mu.Unlock() 337 return false, nil 338 } 339 } 340 if c.consumerWaiting { 341 wakeUp = true 342 c.consumerWaiting = false 343 } 344 c.list.enqueue(it) 345 if it.isTransportResponseFrame() { 346 c.transportResponseFrames++ 347 if c.transportResponseFrames == maxQueuedTransportResponseFrames { 348 // We are adding the frame that puts us over the threshold; create 349 // a throttling channel. 350 c.trfChan.Store(make(chan struct{})) 351 } 352 } 353 c.mu.Unlock() 354 if wakeUp { 355 select { 356 case c.ch <- struct{}{}: 357 default: 358 } 359 } 360 return true, nil 361} 362 363// Note argument f should never be nil. 364func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) { 365 c.mu.Lock() 366 if c.err != nil { 367 c.mu.Unlock() 368 return false, c.err 369 } 370 if !f(it) { // f wasn't successful 371 c.mu.Unlock() 372 return false, nil 373 } 374 c.mu.Unlock() 375 return true, nil 376} 377 378func (c *controlBuffer) get(block bool) (interface{}, error) { 379 for { 380 c.mu.Lock() 381 if c.err != nil { 382 c.mu.Unlock() 383 return nil, c.err 384 } 385 if !c.list.isEmpty() { 386 h := c.list.dequeue().(cbItem) 387 if h.isTransportResponseFrame() { 388 if c.transportResponseFrames == maxQueuedTransportResponseFrames { 389 // We are removing the frame that put us over the 390 // threshold; close and clear the throttling channel. 391 ch := c.trfChan.Load().(chan struct{}) 392 close(ch) 393 c.trfChan.Store((chan struct{})(nil)) 394 } 395 c.transportResponseFrames-- 396 } 397 c.mu.Unlock() 398 return h, nil 399 } 400 if !block { 401 c.mu.Unlock() 402 return nil, nil 403 } 404 c.consumerWaiting = true 405 c.mu.Unlock() 406 select { 407 case <-c.ch: 408 case <-c.done: 409 return nil, ErrConnClosing 410 } 411 } 412} 413 414func (c *controlBuffer) finish() { 415 c.mu.Lock() 416 if c.err != nil { 417 c.mu.Unlock() 418 return 419 } 420 c.err = ErrConnClosing 421 // There may be headers for streams in the control buffer. 422 // These streams need to be cleaned out since the transport 423 // is still not aware of these yet. 424 for head := c.list.dequeueAll(); head != nil; head = head.next { 425 hdr, ok := head.it.(*headerFrame) 426 if !ok { 427 continue 428 } 429 if hdr.onOrphaned != nil { // It will be nil on the server-side. 430 hdr.onOrphaned(ErrConnClosing) 431 } 432 } 433 // In case throttle() is currently in flight, it needs to be unblocked. 434 // Otherwise, the transport may not close, since the transport is closed by 435 // the reader encountering the connection error. 436 ch, _ := c.trfChan.Load().(chan struct{}) 437 if ch != nil { 438 close(ch) 439 } 440 c.trfChan.Store((chan struct{})(nil)) 441 c.mu.Unlock() 442} 443 444type side int 445 446const ( 447 clientSide side = iota 448 serverSide 449) 450 451// Loopy receives frames from the control buffer. 452// Each frame is handled individually; most of the work done by loopy goes 453// into handling data frames. Loopy maintains a queue of active streams, and each 454// stream maintains a queue of data frames; as loopy receives data frames 455// it gets added to the queue of the relevant stream. 456// Loopy goes over this list of active streams by processing one node every iteration, 457// thereby closely resemebling to a round-robin scheduling over all streams. While 458// processing a stream, loopy writes out data bytes from this stream capped by the min 459// of http2MaxFrameLen, connection-level flow control and stream-level flow control. 460type loopyWriter struct { 461 side side 462 cbuf *controlBuffer 463 sendQuota uint32 464 oiws uint32 // outbound initial window size. 465 // estdStreams is map of all established streams that are not cleaned-up yet. 466 // On client-side, this is all streams whose headers were sent out. 467 // On server-side, this is all streams whose headers were received. 468 estdStreams map[uint32]*outStream // Established streams. 469 // activeStreams is a linked-list of all streams that have data to send and some 470 // stream-level flow control quota. 471 // Each of these streams internally have a list of data items(and perhaps trailers 472 // on the server-side) to be sent out. 473 activeStreams *outStreamList 474 framer *framer 475 hBuf *bytes.Buffer // The buffer for HPACK encoding. 476 hEnc *hpack.Encoder // HPACK encoder. 477 bdpEst *bdpEstimator 478 draining bool 479 480 // Side-specific handlers 481 ssGoAwayHandler func(*goAway) (bool, error) 482} 483 484func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter { 485 var buf bytes.Buffer 486 l := &loopyWriter{ 487 side: s, 488 cbuf: cbuf, 489 sendQuota: defaultWindowSize, 490 oiws: defaultWindowSize, 491 estdStreams: make(map[uint32]*outStream), 492 activeStreams: newOutStreamList(), 493 framer: fr, 494 hBuf: &buf, 495 hEnc: hpack.NewEncoder(&buf), 496 bdpEst: bdpEst, 497 } 498 return l 499} 500 501const minBatchSize = 1000 502 503// run should be run in a separate goroutine. 504// It reads control frames from controlBuf and processes them by: 505// 1. Updating loopy's internal state, or/and 506// 2. Writing out HTTP2 frames on the wire. 507// 508// Loopy keeps all active streams with data to send in a linked-list. 509// All streams in the activeStreams linked-list must have both: 510// 1. Data to send, and 511// 2. Stream level flow control quota available. 512// 513// In each iteration of run loop, other than processing the incoming control 514// frame, loopy calls processData, which processes one node from the activeStreams linked-list. 515// This results in writing of HTTP2 frames into an underlying write buffer. 516// When there's no more control frames to read from controlBuf, loopy flushes the write buffer. 517// As an optimization, to increase the batch size for each flush, loopy yields the processor, once 518// if the batch size is too low to give stream goroutines a chance to fill it up. 519func (l *loopyWriter) run() (err error) { 520 defer func() { 521 if err == ErrConnClosing { 522 // Don't log ErrConnClosing as error since it happens 523 // 1. When the connection is closed by some other known issue. 524 // 2. User closed the connection. 525 // 3. A graceful close of connection. 526 if logger.V(logLevel) { 527 logger.Infof("transport: loopyWriter.run returning. %v", err) 528 } 529 err = nil 530 } 531 }() 532 for { 533 it, err := l.cbuf.get(true) 534 if err != nil { 535 return err 536 } 537 if err = l.handle(it); err != nil { 538 return err 539 } 540 if _, err = l.processData(); err != nil { 541 return err 542 } 543 gosched := true 544 hasdata: 545 for { 546 it, err := l.cbuf.get(false) 547 if err != nil { 548 return err 549 } 550 if it != nil { 551 if err = l.handle(it); err != nil { 552 return err 553 } 554 if _, err = l.processData(); err != nil { 555 return err 556 } 557 continue hasdata 558 } 559 isEmpty, err := l.processData() 560 if err != nil { 561 return err 562 } 563 if !isEmpty { 564 continue hasdata 565 } 566 if gosched { 567 gosched = false 568 if l.framer.writer.offset < minBatchSize { 569 runtime.Gosched() 570 continue hasdata 571 } 572 } 573 l.framer.writer.Flush() 574 break hasdata 575 576 } 577 } 578} 579 580func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error { 581 return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment) 582} 583 584func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error { 585 // Otherwise update the quota. 586 if w.streamID == 0 { 587 l.sendQuota += w.increment 588 return nil 589 } 590 // Find the stream and update it. 591 if str, ok := l.estdStreams[w.streamID]; ok { 592 str.bytesOutStanding -= int(w.increment) 593 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota { 594 str.state = active 595 l.activeStreams.enqueue(str) 596 return nil 597 } 598 } 599 return nil 600} 601 602func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error { 603 return l.framer.fr.WriteSettings(s.ss...) 604} 605 606func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error { 607 if err := l.applySettings(s.ss); err != nil { 608 return err 609 } 610 return l.framer.fr.WriteSettingsAck() 611} 612 613func (l *loopyWriter) registerStreamHandler(h *registerStream) error { 614 str := &outStream{ 615 id: h.streamID, 616 state: empty, 617 itl: &itemList{}, 618 wq: h.wq, 619 } 620 l.estdStreams[h.streamID] = str 621 return nil 622} 623 624func (l *loopyWriter) headerHandler(h *headerFrame) error { 625 if l.side == serverSide { 626 str, ok := l.estdStreams[h.streamID] 627 if !ok { 628 if logger.V(logLevel) { 629 logger.Warningf("transport: loopy doesn't recognize the stream: %d", h.streamID) 630 } 631 return nil 632 } 633 // Case 1.A: Server is responding back with headers. 634 if !h.endStream { 635 return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite) 636 } 637 // else: Case 1.B: Server wants to close stream. 638 639 if str.state != empty { // either active or waiting on stream quota. 640 // add it str's list of items. 641 str.itl.enqueue(h) 642 return nil 643 } 644 if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil { 645 return err 646 } 647 return l.cleanupStreamHandler(h.cleanup) 648 } 649 // Case 2: Client wants to originate stream. 650 str := &outStream{ 651 id: h.streamID, 652 state: empty, 653 itl: &itemList{}, 654 wq: h.wq, 655 } 656 str.itl.enqueue(h) 657 return l.originateStream(str) 658} 659 660func (l *loopyWriter) originateStream(str *outStream) error { 661 hdr := str.itl.dequeue().(*headerFrame) 662 if err := hdr.initStream(str.id); err != nil { 663 if err == ErrConnClosing { 664 return err 665 } 666 // Other errors(errStreamDrain) need not close transport. 667 return nil 668 } 669 if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil { 670 return err 671 } 672 l.estdStreams[str.id] = str 673 return nil 674} 675 676func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error { 677 if onWrite != nil { 678 onWrite() 679 } 680 l.hBuf.Reset() 681 for _, f := range hf { 682 if err := l.hEnc.WriteField(f); err != nil { 683 if logger.V(logLevel) { 684 logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", err) 685 } 686 } 687 } 688 var ( 689 err error 690 endHeaders, first bool 691 ) 692 first = true 693 for !endHeaders { 694 size := l.hBuf.Len() 695 if size > http2MaxFrameLen { 696 size = http2MaxFrameLen 697 } else { 698 endHeaders = true 699 } 700 if first { 701 first = false 702 err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{ 703 StreamID: streamID, 704 BlockFragment: l.hBuf.Next(size), 705 EndStream: endStream, 706 EndHeaders: endHeaders, 707 }) 708 } else { 709 err = l.framer.fr.WriteContinuation( 710 streamID, 711 endHeaders, 712 l.hBuf.Next(size), 713 ) 714 } 715 if err != nil { 716 return err 717 } 718 } 719 return nil 720} 721 722func (l *loopyWriter) preprocessData(df *dataFrame) error { 723 str, ok := l.estdStreams[df.streamID] 724 if !ok { 725 return nil 726 } 727 // If we got data for a stream it means that 728 // stream was originated and the headers were sent out. 729 str.itl.enqueue(df) 730 if str.state == empty { 731 str.state = active 732 l.activeStreams.enqueue(str) 733 } 734 return nil 735} 736 737func (l *loopyWriter) pingHandler(p *ping) error { 738 if !p.ack { 739 l.bdpEst.timesnap(p.data) 740 } 741 return l.framer.fr.WritePing(p.ack, p.data) 742 743} 744 745func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error { 746 o.resp <- l.sendQuota 747 return nil 748} 749 750func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { 751 c.onWrite() 752 if str, ok := l.estdStreams[c.streamID]; ok { 753 // On the server side it could be a trailers-only response or 754 // a RST_STREAM before stream initialization thus the stream might 755 // not be established yet. 756 delete(l.estdStreams, c.streamID) 757 str.deleteSelf() 758 } 759 if c.rst { // If RST_STREAM needs to be sent. 760 if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil { 761 return err 762 } 763 } 764 if l.side == clientSide && l.draining && len(l.estdStreams) == 0 { 765 return ErrConnClosing 766 } 767 return nil 768} 769 770func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error { 771 if l.side == clientSide { 772 return errors.New("earlyAbortStream not handled on client") 773 } 774 775 headerFields := []hpack.HeaderField{ 776 {Name: ":status", Value: "200"}, 777 {Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)}, 778 {Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))}, 779 {Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())}, 780 } 781 782 if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil { 783 return err 784 } 785 return nil 786} 787 788func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error { 789 if l.side == clientSide { 790 l.draining = true 791 if len(l.estdStreams) == 0 { 792 return ErrConnClosing 793 } 794 } 795 return nil 796} 797 798func (l *loopyWriter) goAwayHandler(g *goAway) error { 799 // Handling of outgoing GoAway is very specific to side. 800 if l.ssGoAwayHandler != nil { 801 draining, err := l.ssGoAwayHandler(g) 802 if err != nil { 803 return err 804 } 805 l.draining = draining 806 } 807 return nil 808} 809 810func (l *loopyWriter) handle(i interface{}) error { 811 switch i := i.(type) { 812 case *incomingWindowUpdate: 813 return l.incomingWindowUpdateHandler(i) 814 case *outgoingWindowUpdate: 815 return l.outgoingWindowUpdateHandler(i) 816 case *incomingSettings: 817 return l.incomingSettingsHandler(i) 818 case *outgoingSettings: 819 return l.outgoingSettingsHandler(i) 820 case *headerFrame: 821 return l.headerHandler(i) 822 case *registerStream: 823 return l.registerStreamHandler(i) 824 case *cleanupStream: 825 return l.cleanupStreamHandler(i) 826 case *earlyAbortStream: 827 return l.earlyAbortStreamHandler(i) 828 case *incomingGoAway: 829 return l.incomingGoAwayHandler(i) 830 case *dataFrame: 831 return l.preprocessData(i) 832 case *ping: 833 return l.pingHandler(i) 834 case *goAway: 835 return l.goAwayHandler(i) 836 case *outFlowControlSizeRequest: 837 return l.outFlowControlSizeRequestHandler(i) 838 default: 839 return fmt.Errorf("transport: unknown control message type %T", i) 840 } 841} 842 843func (l *loopyWriter) applySettings(ss []http2.Setting) error { 844 for _, s := range ss { 845 switch s.ID { 846 case http2.SettingInitialWindowSize: 847 o := l.oiws 848 l.oiws = s.Val 849 if o < l.oiws { 850 // If the new limit is greater make all depleted streams active. 851 for _, stream := range l.estdStreams { 852 if stream.state == waitingOnStreamQuota { 853 stream.state = active 854 l.activeStreams.enqueue(stream) 855 } 856 } 857 } 858 case http2.SettingHeaderTableSize: 859 updateHeaderTblSize(l.hEnc, s.Val) 860 } 861 } 862 return nil 863} 864 865// processData removes the first stream from active streams, writes out at most 16KB 866// of its data and then puts it at the end of activeStreams if there's still more data 867// to be sent and stream has some stream-level flow control. 868func (l *loopyWriter) processData() (bool, error) { 869 if l.sendQuota == 0 { 870 return true, nil 871 } 872 str := l.activeStreams.dequeue() // Remove the first stream. 873 if str == nil { 874 return true, nil 875 } 876 dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream. 877 // A data item is represented by a dataFrame, since it later translates into 878 // multiple HTTP2 data frames. 879 // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data. 880 // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the 881 // maximum possilbe HTTP2 frame size. 882 883 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame 884 // Client sends out empty data frame with endStream = true 885 if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil { 886 return false, err 887 } 888 str.itl.dequeue() // remove the empty data item from stream 889 if str.itl.isEmpty() { 890 str.state = empty 891 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers. 892 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { 893 return false, err 894 } 895 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { 896 return false, nil 897 } 898 } else { 899 l.activeStreams.enqueue(str) 900 } 901 return false, nil 902 } 903 var ( 904 buf []byte 905 ) 906 // Figure out the maximum size we can send 907 maxSize := http2MaxFrameLen 908 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control. 909 str.state = waitingOnStreamQuota 910 return false, nil 911 } else if maxSize > strQuota { 912 maxSize = strQuota 913 } 914 if maxSize > int(l.sendQuota) { // connection-level flow control. 915 maxSize = int(l.sendQuota) 916 } 917 // Compute how much of the header and data we can send within quota and max frame length 918 hSize := min(maxSize, len(dataItem.h)) 919 dSize := min(maxSize-hSize, len(dataItem.d)) 920 if hSize != 0 { 921 if dSize == 0 { 922 buf = dataItem.h 923 } else { 924 // We can add some data to grpc message header to distribute bytes more equally across frames. 925 // Copy on the stack to avoid generating garbage 926 var localBuf [http2MaxFrameLen]byte 927 copy(localBuf[:hSize], dataItem.h) 928 copy(localBuf[hSize:], dataItem.d[:dSize]) 929 buf = localBuf[:hSize+dSize] 930 } 931 } else { 932 buf = dataItem.d 933 } 934 935 size := hSize + dSize 936 937 // Now that outgoing flow controls are checked we can replenish str's write quota 938 str.wq.replenish(size) 939 var endStream bool 940 // If this is the last data message on this stream and all of it can be written in this iteration. 941 if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size { 942 endStream = true 943 } 944 if dataItem.onEachWrite != nil { 945 dataItem.onEachWrite() 946 } 947 if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil { 948 return false, err 949 } 950 str.bytesOutStanding += size 951 l.sendQuota -= uint32(size) 952 dataItem.h = dataItem.h[hSize:] 953 dataItem.d = dataItem.d[dSize:] 954 955 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out. 956 str.itl.dequeue() 957 } 958 if str.itl.isEmpty() { 959 str.state = empty 960 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers. 961 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { 962 return false, err 963 } 964 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { 965 return false, err 966 } 967 } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota. 968 str.state = waitingOnStreamQuota 969 } else { // Otherwise add it back to the list of active streams. 970 l.activeStreams.enqueue(str) 971 } 972 return false, nil 973} 974 975func min(a, b int) int { 976 if a < b { 977 return a 978 } 979 return b 980} 981