1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "bytes" 23 "fmt" 24 "runtime" 25 "sync" 26 "sync/atomic" 27 28 "golang.org/x/net/http2" 29 "golang.org/x/net/http2/hpack" 30) 31 32var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) { 33 e.SetMaxDynamicTableSizeLimit(v) 34} 35 36type itemNode struct { 37 it interface{} 38 next *itemNode 39} 40 41type itemList struct { 42 head *itemNode 43 tail *itemNode 44} 45 46func (il *itemList) enqueue(i interface{}) { 47 n := &itemNode{it: i} 48 if il.tail == nil { 49 il.head, il.tail = n, n 50 return 51 } 52 il.tail.next = n 53 il.tail = n 54} 55 56// peek returns the first item in the list without removing it from the 57// list. 58func (il *itemList) peek() interface{} { 59 return il.head.it 60} 61 62func (il *itemList) dequeue() interface{} { 63 if il.head == nil { 64 return nil 65 } 66 i := il.head.it 67 il.head = il.head.next 68 if il.head == nil { 69 il.tail = nil 70 } 71 return i 72} 73 74func (il *itemList) dequeueAll() *itemNode { 75 h := il.head 76 il.head, il.tail = nil, nil 77 return h 78} 79 80func (il *itemList) isEmpty() bool { 81 return il.head == nil 82} 83 84// The following defines various control items which could flow through 85// the control buffer of transport. They represent different aspects of 86// control tasks, e.g., flow control, settings, streaming resetting, etc. 87 88// maxQueuedTransportResponseFrames is the most queued "transport response" 89// frames we will buffer before preventing new reads from occurring on the 90// transport. These are control frames sent in response to client requests, 91// such as RST_STREAM due to bad headers or settings acks. 92const maxQueuedTransportResponseFrames = 50 93 94type cbItem interface { 95 isTransportResponseFrame() bool 96} 97 98// registerStream is used to register an incoming stream with loopy writer. 99type registerStream struct { 100 streamID uint32 101 wq *writeQuota 102} 103 104func (*registerStream) isTransportResponseFrame() bool { return false } 105 106// headerFrame is also used to register stream on the client-side. 107type headerFrame struct { 108 streamID uint32 109 hf []hpack.HeaderField 110 endStream bool // Valid on server side. 111 initStream func(uint32) error // Used only on the client side. 112 onWrite func() 113 wq *writeQuota // write quota for the stream created. 114 cleanup *cleanupStream // Valid on the server side. 115 onOrphaned func(error) // Valid on client-side 116} 117 118func (h *headerFrame) isTransportResponseFrame() bool { 119 return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM 120} 121 122type cleanupStream struct { 123 streamID uint32 124 rst bool 125 rstCode http2.ErrCode 126 onWrite func() 127} 128 129func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM 130 131type dataFrame struct { 132 streamID uint32 133 endStream bool 134 h []byte 135 d []byte 136 // onEachWrite is called every time 137 // a part of d is written out. 138 onEachWrite func() 139} 140 141func (*dataFrame) isTransportResponseFrame() bool { return false } 142 143type incomingWindowUpdate struct { 144 streamID uint32 145 increment uint32 146} 147 148func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false } 149 150type outgoingWindowUpdate struct { 151 streamID uint32 152 increment uint32 153} 154 155func (*outgoingWindowUpdate) isTransportResponseFrame() bool { 156 return false // window updates are throttled by thresholds 157} 158 159type incomingSettings struct { 160 ss []http2.Setting 161} 162 163func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK 164 165type outgoingSettings struct { 166 ss []http2.Setting 167} 168 169func (*outgoingSettings) isTransportResponseFrame() bool { return false } 170 171type incomingGoAway struct { 172} 173 174func (*incomingGoAway) isTransportResponseFrame() bool { return false } 175 176type goAway struct { 177 code http2.ErrCode 178 debugData []byte 179 headsUp bool 180 closeConn bool 181} 182 183func (*goAway) isTransportResponseFrame() bool { return false } 184 185type ping struct { 186 ack bool 187 data [8]byte 188} 189 190func (*ping) isTransportResponseFrame() bool { return true } 191 192type outFlowControlSizeRequest struct { 193 resp chan uint32 194} 195 196func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false } 197 198type outStreamState int 199 200const ( 201 active outStreamState = iota 202 empty 203 waitingOnStreamQuota 204) 205 206type outStream struct { 207 id uint32 208 state outStreamState 209 itl *itemList 210 bytesOutStanding int 211 wq *writeQuota 212 213 next *outStream 214 prev *outStream 215} 216 217func (s *outStream) deleteSelf() { 218 if s.prev != nil { 219 s.prev.next = s.next 220 } 221 if s.next != nil { 222 s.next.prev = s.prev 223 } 224 s.next, s.prev = nil, nil 225} 226 227type outStreamList struct { 228 // Following are sentinel objects that mark the 229 // beginning and end of the list. They do not 230 // contain any item lists. All valid objects are 231 // inserted in between them. 232 // This is needed so that an outStream object can 233 // deleteSelf() in O(1) time without knowing which 234 // list it belongs to. 235 head *outStream 236 tail *outStream 237} 238 239func newOutStreamList() *outStreamList { 240 head, tail := new(outStream), new(outStream) 241 head.next = tail 242 tail.prev = head 243 return &outStreamList{ 244 head: head, 245 tail: tail, 246 } 247} 248 249func (l *outStreamList) enqueue(s *outStream) { 250 e := l.tail.prev 251 e.next = s 252 s.prev = e 253 s.next = l.tail 254 l.tail.prev = s 255} 256 257// remove from the beginning of the list. 258func (l *outStreamList) dequeue() *outStream { 259 b := l.head.next 260 if b == l.tail { 261 return nil 262 } 263 b.deleteSelf() 264 return b 265} 266 267// controlBuffer is a way to pass information to loopy. 268// Information is passed as specific struct types called control frames. 269// A control frame not only represents data, messages or headers to be sent out 270// but can also be used to instruct loopy to update its internal state. 271// It shouldn't be confused with an HTTP2 frame, although some of the control frames 272// like dataFrame and headerFrame do go out on wire as HTTP2 frames. 273type controlBuffer struct { 274 ch chan struct{} 275 done <-chan struct{} 276 mu sync.Mutex 277 consumerWaiting bool 278 list *itemList 279 err error 280 281 // transportResponseFrames counts the number of queued items that represent 282 // the response of an action initiated by the peer. trfChan is created 283 // when transportResponseFrames >= maxQueuedTransportResponseFrames and is 284 // closed and nilled when transportResponseFrames drops below the 285 // threshold. Both fields are protected by mu. 286 transportResponseFrames int 287 trfChan atomic.Value // *chan struct{} 288} 289 290func newControlBuffer(done <-chan struct{}) *controlBuffer { 291 return &controlBuffer{ 292 ch: make(chan struct{}, 1), 293 list: &itemList{}, 294 done: done, 295 } 296} 297 298// throttle blocks if there are too many incomingSettings/cleanupStreams in the 299// controlbuf. 300func (c *controlBuffer) throttle() { 301 ch, _ := c.trfChan.Load().(*chan struct{}) 302 if ch != nil { 303 select { 304 case <-*ch: 305 case <-c.done: 306 } 307 } 308} 309 310func (c *controlBuffer) put(it cbItem) error { 311 _, err := c.executeAndPut(nil, it) 312 return err 313} 314 315func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) { 316 var wakeUp bool 317 c.mu.Lock() 318 if c.err != nil { 319 c.mu.Unlock() 320 return false, c.err 321 } 322 if f != nil { 323 if !f(it) { // f wasn't successful 324 c.mu.Unlock() 325 return false, nil 326 } 327 } 328 if c.consumerWaiting { 329 wakeUp = true 330 c.consumerWaiting = false 331 } 332 c.list.enqueue(it) 333 if it.isTransportResponseFrame() { 334 c.transportResponseFrames++ 335 if c.transportResponseFrames == maxQueuedTransportResponseFrames { 336 // We are adding the frame that puts us over the threshold; create 337 // a throttling channel. 338 ch := make(chan struct{}) 339 c.trfChan.Store(&ch) 340 } 341 } 342 c.mu.Unlock() 343 if wakeUp { 344 select { 345 case c.ch <- struct{}{}: 346 default: 347 } 348 } 349 return true, nil 350} 351 352// Note argument f should never be nil. 353func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) { 354 c.mu.Lock() 355 if c.err != nil { 356 c.mu.Unlock() 357 return false, c.err 358 } 359 if !f(it) { // f wasn't successful 360 c.mu.Unlock() 361 return false, nil 362 } 363 c.mu.Unlock() 364 return true, nil 365} 366 367func (c *controlBuffer) get(block bool) (interface{}, error) { 368 for { 369 c.mu.Lock() 370 if c.err != nil { 371 c.mu.Unlock() 372 return nil, c.err 373 } 374 if !c.list.isEmpty() { 375 h := c.list.dequeue().(cbItem) 376 if h.isTransportResponseFrame() { 377 if c.transportResponseFrames == maxQueuedTransportResponseFrames { 378 // We are removing the frame that put us over the 379 // threshold; close and clear the throttling channel. 380 ch := c.trfChan.Load().(*chan struct{}) 381 close(*ch) 382 c.trfChan.Store((*chan struct{})(nil)) 383 } 384 c.transportResponseFrames-- 385 } 386 c.mu.Unlock() 387 return h, nil 388 } 389 if !block { 390 c.mu.Unlock() 391 return nil, nil 392 } 393 c.consumerWaiting = true 394 c.mu.Unlock() 395 select { 396 case <-c.ch: 397 case <-c.done: 398 c.finish() 399 return nil, ErrConnClosing 400 } 401 } 402} 403 404func (c *controlBuffer) finish() { 405 c.mu.Lock() 406 if c.err != nil { 407 c.mu.Unlock() 408 return 409 } 410 c.err = ErrConnClosing 411 // There may be headers for streams in the control buffer. 412 // These streams need to be cleaned out since the transport 413 // is still not aware of these yet. 414 for head := c.list.dequeueAll(); head != nil; head = head.next { 415 hdr, ok := head.it.(*headerFrame) 416 if !ok { 417 continue 418 } 419 if hdr.onOrphaned != nil { // It will be nil on the server-side. 420 hdr.onOrphaned(ErrConnClosing) 421 } 422 } 423 c.mu.Unlock() 424} 425 426type side int 427 428const ( 429 clientSide side = iota 430 serverSide 431) 432 433// Loopy receives frames from the control buffer. 434// Each frame is handled individually; most of the work done by loopy goes 435// into handling data frames. Loopy maintains a queue of active streams, and each 436// stream maintains a queue of data frames; as loopy receives data frames 437// it gets added to the queue of the relevant stream. 438// Loopy goes over this list of active streams by processing one node every iteration, 439// thereby closely resemebling to a round-robin scheduling over all streams. While 440// processing a stream, loopy writes out data bytes from this stream capped by the min 441// of http2MaxFrameLen, connection-level flow control and stream-level flow control. 442type loopyWriter struct { 443 side side 444 cbuf *controlBuffer 445 sendQuota uint32 446 oiws uint32 // outbound initial window size. 447 // estdStreams is map of all established streams that are not cleaned-up yet. 448 // On client-side, this is all streams whose headers were sent out. 449 // On server-side, this is all streams whose headers were received. 450 estdStreams map[uint32]*outStream // Established streams. 451 // activeStreams is a linked-list of all streams that have data to send and some 452 // stream-level flow control quota. 453 // Each of these streams internally have a list of data items(and perhaps trailers 454 // on the server-side) to be sent out. 455 activeStreams *outStreamList 456 framer *framer 457 hBuf *bytes.Buffer // The buffer for HPACK encoding. 458 hEnc *hpack.Encoder // HPACK encoder. 459 bdpEst *bdpEstimator 460 draining bool 461 462 // Side-specific handlers 463 ssGoAwayHandler func(*goAway) (bool, error) 464} 465 466func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter { 467 var buf bytes.Buffer 468 l := &loopyWriter{ 469 side: s, 470 cbuf: cbuf, 471 sendQuota: defaultWindowSize, 472 oiws: defaultWindowSize, 473 estdStreams: make(map[uint32]*outStream), 474 activeStreams: newOutStreamList(), 475 framer: fr, 476 hBuf: &buf, 477 hEnc: hpack.NewEncoder(&buf), 478 bdpEst: bdpEst, 479 } 480 return l 481} 482 483const minBatchSize = 1000 484 485// run should be run in a separate goroutine. 486// It reads control frames from controlBuf and processes them by: 487// 1. Updating loopy's internal state, or/and 488// 2. Writing out HTTP2 frames on the wire. 489// 490// Loopy keeps all active streams with data to send in a linked-list. 491// All streams in the activeStreams linked-list must have both: 492// 1. Data to send, and 493// 2. Stream level flow control quota available. 494// 495// In each iteration of run loop, other than processing the incoming control 496// frame, loopy calls processData, which processes one node from the activeStreams linked-list. 497// This results in writing of HTTP2 frames into an underlying write buffer. 498// When there's no more control frames to read from controlBuf, loopy flushes the write buffer. 499// As an optimization, to increase the batch size for each flush, loopy yields the processor, once 500// if the batch size is too low to give stream goroutines a chance to fill it up. 501func (l *loopyWriter) run() (err error) { 502 defer func() { 503 if err == ErrConnClosing { 504 // Don't log ErrConnClosing as error since it happens 505 // 1. When the connection is closed by some other known issue. 506 // 2. User closed the connection. 507 // 3. A graceful close of connection. 508 infof("transport: loopyWriter.run returning. %v", err) 509 err = nil 510 } 511 }() 512 for { 513 it, err := l.cbuf.get(true) 514 if err != nil { 515 return err 516 } 517 if err = l.handle(it); err != nil { 518 return err 519 } 520 if _, err = l.processData(); err != nil { 521 return err 522 } 523 gosched := true 524 hasdata: 525 for { 526 it, err := l.cbuf.get(false) 527 if err != nil { 528 return err 529 } 530 if it != nil { 531 if err = l.handle(it); err != nil { 532 return err 533 } 534 if _, err = l.processData(); err != nil { 535 return err 536 } 537 continue hasdata 538 } 539 isEmpty, err := l.processData() 540 if err != nil { 541 return err 542 } 543 if !isEmpty { 544 continue hasdata 545 } 546 if gosched { 547 gosched = false 548 if l.framer.writer.offset < minBatchSize { 549 runtime.Gosched() 550 continue hasdata 551 } 552 } 553 l.framer.writer.Flush() 554 break hasdata 555 556 } 557 } 558} 559 560func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error { 561 return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment) 562} 563 564func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error { 565 // Otherwise update the quota. 566 if w.streamID == 0 { 567 l.sendQuota += w.increment 568 return nil 569 } 570 // Find the stream and update it. 571 if str, ok := l.estdStreams[w.streamID]; ok { 572 str.bytesOutStanding -= int(w.increment) 573 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota { 574 str.state = active 575 l.activeStreams.enqueue(str) 576 return nil 577 } 578 } 579 return nil 580} 581 582func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error { 583 return l.framer.fr.WriteSettings(s.ss...) 584} 585 586func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error { 587 if err := l.applySettings(s.ss); err != nil { 588 return err 589 } 590 return l.framer.fr.WriteSettingsAck() 591} 592 593func (l *loopyWriter) registerStreamHandler(h *registerStream) error { 594 str := &outStream{ 595 id: h.streamID, 596 state: empty, 597 itl: &itemList{}, 598 wq: h.wq, 599 } 600 l.estdStreams[h.streamID] = str 601 return nil 602} 603 604func (l *loopyWriter) headerHandler(h *headerFrame) error { 605 if l.side == serverSide { 606 str, ok := l.estdStreams[h.streamID] 607 if !ok { 608 warningf("transport: loopy doesn't recognize the stream: %d", h.streamID) 609 return nil 610 } 611 // Case 1.A: Server is responding back with headers. 612 if !h.endStream { 613 return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite) 614 } 615 // else: Case 1.B: Server wants to close stream. 616 617 if str.state != empty { // either active or waiting on stream quota. 618 // add it str's list of items. 619 str.itl.enqueue(h) 620 return nil 621 } 622 if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil { 623 return err 624 } 625 return l.cleanupStreamHandler(h.cleanup) 626 } 627 // Case 2: Client wants to originate stream. 628 str := &outStream{ 629 id: h.streamID, 630 state: empty, 631 itl: &itemList{}, 632 wq: h.wq, 633 } 634 str.itl.enqueue(h) 635 return l.originateStream(str) 636} 637 638func (l *loopyWriter) originateStream(str *outStream) error { 639 hdr := str.itl.dequeue().(*headerFrame) 640 if err := hdr.initStream(str.id); err != nil { 641 if err == ErrConnClosing { 642 return err 643 } 644 // Other errors(errStreamDrain) need not close transport. 645 return nil 646 } 647 if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil { 648 return err 649 } 650 l.estdStreams[str.id] = str 651 return nil 652} 653 654func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error { 655 if onWrite != nil { 656 onWrite() 657 } 658 l.hBuf.Reset() 659 for _, f := range hf { 660 if err := l.hEnc.WriteField(f); err != nil { 661 warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err) 662 } 663 } 664 var ( 665 err error 666 endHeaders, first bool 667 ) 668 first = true 669 for !endHeaders { 670 size := l.hBuf.Len() 671 if size > http2MaxFrameLen { 672 size = http2MaxFrameLen 673 } else { 674 endHeaders = true 675 } 676 if first { 677 first = false 678 err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{ 679 StreamID: streamID, 680 BlockFragment: l.hBuf.Next(size), 681 EndStream: endStream, 682 EndHeaders: endHeaders, 683 }) 684 } else { 685 err = l.framer.fr.WriteContinuation( 686 streamID, 687 endHeaders, 688 l.hBuf.Next(size), 689 ) 690 } 691 if err != nil { 692 return err 693 } 694 } 695 return nil 696} 697 698func (l *loopyWriter) preprocessData(df *dataFrame) error { 699 str, ok := l.estdStreams[df.streamID] 700 if !ok { 701 return nil 702 } 703 // If we got data for a stream it means that 704 // stream was originated and the headers were sent out. 705 str.itl.enqueue(df) 706 if str.state == empty { 707 str.state = active 708 l.activeStreams.enqueue(str) 709 } 710 return nil 711} 712 713func (l *loopyWriter) pingHandler(p *ping) error { 714 if !p.ack { 715 l.bdpEst.timesnap(p.data) 716 } 717 return l.framer.fr.WritePing(p.ack, p.data) 718 719} 720 721func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error { 722 o.resp <- l.sendQuota 723 return nil 724} 725 726func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { 727 c.onWrite() 728 if str, ok := l.estdStreams[c.streamID]; ok { 729 // On the server side it could be a trailers-only response or 730 // a RST_STREAM before stream initialization thus the stream might 731 // not be established yet. 732 delete(l.estdStreams, c.streamID) 733 str.deleteSelf() 734 } 735 if c.rst { // If RST_STREAM needs to be sent. 736 if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil { 737 return err 738 } 739 } 740 if l.side == clientSide && l.draining && len(l.estdStreams) == 0 { 741 return ErrConnClosing 742 } 743 return nil 744} 745 746func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error { 747 if l.side == clientSide { 748 l.draining = true 749 if len(l.estdStreams) == 0 { 750 return ErrConnClosing 751 } 752 } 753 return nil 754} 755 756func (l *loopyWriter) goAwayHandler(g *goAway) error { 757 // Handling of outgoing GoAway is very specific to side. 758 if l.ssGoAwayHandler != nil { 759 draining, err := l.ssGoAwayHandler(g) 760 if err != nil { 761 return err 762 } 763 l.draining = draining 764 } 765 return nil 766} 767 768func (l *loopyWriter) handle(i interface{}) error { 769 switch i := i.(type) { 770 case *incomingWindowUpdate: 771 return l.incomingWindowUpdateHandler(i) 772 case *outgoingWindowUpdate: 773 return l.outgoingWindowUpdateHandler(i) 774 case *incomingSettings: 775 return l.incomingSettingsHandler(i) 776 case *outgoingSettings: 777 return l.outgoingSettingsHandler(i) 778 case *headerFrame: 779 return l.headerHandler(i) 780 case *registerStream: 781 return l.registerStreamHandler(i) 782 case *cleanupStream: 783 return l.cleanupStreamHandler(i) 784 case *incomingGoAway: 785 return l.incomingGoAwayHandler(i) 786 case *dataFrame: 787 return l.preprocessData(i) 788 case *ping: 789 return l.pingHandler(i) 790 case *goAway: 791 return l.goAwayHandler(i) 792 case *outFlowControlSizeRequest: 793 return l.outFlowControlSizeRequestHandler(i) 794 default: 795 return fmt.Errorf("transport: unknown control message type %T", i) 796 } 797} 798 799func (l *loopyWriter) applySettings(ss []http2.Setting) error { 800 for _, s := range ss { 801 switch s.ID { 802 case http2.SettingInitialWindowSize: 803 o := l.oiws 804 l.oiws = s.Val 805 if o < l.oiws { 806 // If the new limit is greater make all depleted streams active. 807 for _, stream := range l.estdStreams { 808 if stream.state == waitingOnStreamQuota { 809 stream.state = active 810 l.activeStreams.enqueue(stream) 811 } 812 } 813 } 814 case http2.SettingHeaderTableSize: 815 updateHeaderTblSize(l.hEnc, s.Val) 816 } 817 } 818 return nil 819} 820 821// processData removes the first stream from active streams, writes out at most 16KB 822// of its data and then puts it at the end of activeStreams if there's still more data 823// to be sent and stream has some stream-level flow control. 824func (l *loopyWriter) processData() (bool, error) { 825 if l.sendQuota == 0 { 826 return true, nil 827 } 828 str := l.activeStreams.dequeue() // Remove the first stream. 829 if str == nil { 830 return true, nil 831 } 832 dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream. 833 // A data item is represented by a dataFrame, since it later translates into 834 // multiple HTTP2 data frames. 835 // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data. 836 // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the 837 // maximum possilbe HTTP2 frame size. 838 839 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame 840 // Client sends out empty data frame with endStream = true 841 if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil { 842 return false, err 843 } 844 str.itl.dequeue() // remove the empty data item from stream 845 if str.itl.isEmpty() { 846 str.state = empty 847 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers. 848 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { 849 return false, err 850 } 851 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { 852 return false, nil 853 } 854 } else { 855 l.activeStreams.enqueue(str) 856 } 857 return false, nil 858 } 859 var ( 860 idx int 861 buf []byte 862 ) 863 if len(dataItem.h) != 0 { // data header has not been written out yet. 864 buf = dataItem.h 865 } else { 866 idx = 1 867 buf = dataItem.d 868 } 869 size := http2MaxFrameLen 870 if len(buf) < size { 871 size = len(buf) 872 } 873 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control. 874 str.state = waitingOnStreamQuota 875 return false, nil 876 } else if strQuota < size { 877 size = strQuota 878 } 879 880 if l.sendQuota < uint32(size) { // connection-level flow control. 881 size = int(l.sendQuota) 882 } 883 // Now that outgoing flow controls are checked we can replenish str's write quota 884 str.wq.replenish(size) 885 var endStream bool 886 // If this is the last data message on this stream and all of it can be written in this iteration. 887 if dataItem.endStream && size == len(buf) { 888 // buf contains either data or it contains header but data is empty. 889 if idx == 1 || len(dataItem.d) == 0 { 890 endStream = true 891 } 892 } 893 if dataItem.onEachWrite != nil { 894 dataItem.onEachWrite() 895 } 896 if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil { 897 return false, err 898 } 899 buf = buf[size:] 900 str.bytesOutStanding += size 901 l.sendQuota -= uint32(size) 902 if idx == 0 { 903 dataItem.h = buf 904 } else { 905 dataItem.d = buf 906 } 907 908 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out. 909 str.itl.dequeue() 910 } 911 if str.itl.isEmpty() { 912 str.state = empty 913 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers. 914 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { 915 return false, err 916 } 917 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { 918 return false, err 919 } 920 } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota. 921 str.state = waitingOnStreamQuota 922 } else { // Otherwise add it back to the list of active streams. 923 l.activeStreams.enqueue(str) 924 } 925 return false, nil 926} 927