1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "bytes" 23 "fmt" 24 "runtime" 25 "sync" 26 27 "golang.org/x/net/http2" 28 "golang.org/x/net/http2/hpack" 29) 30 31var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) { 32 e.SetMaxDynamicTableSizeLimit(v) 33} 34 35type itemNode struct { 36 it interface{} 37 next *itemNode 38} 39 40type itemList struct { 41 head *itemNode 42 tail *itemNode 43} 44 45func (il *itemList) enqueue(i interface{}) { 46 n := &itemNode{it: i} 47 if il.tail == nil { 48 il.head, il.tail = n, n 49 return 50 } 51 il.tail.next = n 52 il.tail = n 53} 54 55// peek returns the first item in the list without removing it from the 56// list. 57func (il *itemList) peek() interface{} { 58 return il.head.it 59} 60 61func (il *itemList) dequeue() interface{} { 62 if il.head == nil { 63 return nil 64 } 65 i := il.head.it 66 il.head = il.head.next 67 if il.head == nil { 68 il.tail = nil 69 } 70 return i 71} 72 73func (il *itemList) dequeueAll() *itemNode { 74 h := il.head 75 il.head, il.tail = nil, nil 76 return h 77} 78 79func (il *itemList) isEmpty() bool { 80 return il.head == nil 81} 82 83// The following defines various control items which could flow through 84// the control buffer of transport. They represent different aspects of 85// control tasks, e.g., flow control, settings, streaming resetting, etc. 86 87// registerStream is used to register an incoming stream with loopy writer. 88type registerStream struct { 89 streamID uint32 90 wq *writeQuota 91} 92 93// headerFrame is also used to register stream on the client-side. 94type headerFrame struct { 95 streamID uint32 96 hf []hpack.HeaderField 97 endStream bool // Valid on server side. 98 initStream func(uint32) (bool, error) // Used only on the client side. 99 onWrite func() 100 wq *writeQuota // write quota for the stream created. 101 cleanup *cleanupStream // Valid on the server side. 102 onOrphaned func(error) // Valid on client-side 103} 104 105type cleanupStream struct { 106 streamID uint32 107 rst bool 108 rstCode http2.ErrCode 109 onWrite func() 110} 111 112type dataFrame struct { 113 streamID uint32 114 endStream bool 115 h []byte 116 d []byte 117 // onEachWrite is called every time 118 // a part of d is written out. 119 onEachWrite func() 120} 121 122type incomingWindowUpdate struct { 123 streamID uint32 124 increment uint32 125} 126 127type outgoingWindowUpdate struct { 128 streamID uint32 129 increment uint32 130} 131 132type incomingSettings struct { 133 ss []http2.Setting 134} 135 136type outgoingSettings struct { 137 ss []http2.Setting 138} 139 140type incomingGoAway struct { 141} 142 143type goAway struct { 144 code http2.ErrCode 145 debugData []byte 146 headsUp bool 147 closeConn bool 148} 149 150type ping struct { 151 ack bool 152 data [8]byte 153} 154 155type outFlowControlSizeRequest struct { 156 resp chan uint32 157} 158 159type outStreamState int 160 161const ( 162 active outStreamState = iota 163 empty 164 waitingOnStreamQuota 165) 166 167type outStream struct { 168 id uint32 169 state outStreamState 170 itl *itemList 171 bytesOutStanding int 172 wq *writeQuota 173 174 next *outStream 175 prev *outStream 176} 177 178func (s *outStream) deleteSelf() { 179 if s.prev != nil { 180 s.prev.next = s.next 181 } 182 if s.next != nil { 183 s.next.prev = s.prev 184 } 185 s.next, s.prev = nil, nil 186} 187 188type outStreamList struct { 189 // Following are sentinel objects that mark the 190 // beginning and end of the list. They do not 191 // contain any item lists. All valid objects are 192 // inserted in between them. 193 // This is needed so that an outStream object can 194 // deleteSelf() in O(1) time without knowing which 195 // list it belongs to. 196 head *outStream 197 tail *outStream 198} 199 200func newOutStreamList() *outStreamList { 201 head, tail := new(outStream), new(outStream) 202 head.next = tail 203 tail.prev = head 204 return &outStreamList{ 205 head: head, 206 tail: tail, 207 } 208} 209 210func (l *outStreamList) enqueue(s *outStream) { 211 e := l.tail.prev 212 e.next = s 213 s.prev = e 214 s.next = l.tail 215 l.tail.prev = s 216} 217 218// remove from the beginning of the list. 219func (l *outStreamList) dequeue() *outStream { 220 b := l.head.next 221 if b == l.tail { 222 return nil 223 } 224 b.deleteSelf() 225 return b 226} 227 228// controlBuffer is a way to pass information to loopy. 229// Information is passed as specific struct types called control frames. 230// A control frame not only represents data, messages or headers to be sent out 231// but can also be used to instruct loopy to update its internal state. 232// It shouldn't be confused with an HTTP2 frame, although some of the control frames 233// like dataFrame and headerFrame do go out on wire as HTTP2 frames. 234type controlBuffer struct { 235 ch chan struct{} 236 done <-chan struct{} 237 mu sync.Mutex 238 consumerWaiting bool 239 list *itemList 240 err error 241} 242 243func newControlBuffer(done <-chan struct{}) *controlBuffer { 244 return &controlBuffer{ 245 ch: make(chan struct{}, 1), 246 list: &itemList{}, 247 done: done, 248 } 249} 250 251func (c *controlBuffer) put(it interface{}) error { 252 _, err := c.executeAndPut(nil, it) 253 return err 254} 255 256func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) { 257 var wakeUp bool 258 c.mu.Lock() 259 if c.err != nil { 260 c.mu.Unlock() 261 return false, c.err 262 } 263 if f != nil { 264 if !f(it) { // f wasn't successful 265 c.mu.Unlock() 266 return false, nil 267 } 268 } 269 if c.consumerWaiting { 270 wakeUp = true 271 c.consumerWaiting = false 272 } 273 c.list.enqueue(it) 274 c.mu.Unlock() 275 if wakeUp { 276 select { 277 case c.ch <- struct{}{}: 278 default: 279 } 280 } 281 return true, nil 282} 283 284// Note argument f should never be nil. 285func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) { 286 c.mu.Lock() 287 if c.err != nil { 288 c.mu.Unlock() 289 return false, c.err 290 } 291 if !f(it) { // f wasn't successful 292 c.mu.Unlock() 293 return false, nil 294 } 295 c.mu.Unlock() 296 return true, nil 297} 298 299func (c *controlBuffer) get(block bool) (interface{}, error) { 300 for { 301 c.mu.Lock() 302 if c.err != nil { 303 c.mu.Unlock() 304 return nil, c.err 305 } 306 if !c.list.isEmpty() { 307 h := c.list.dequeue() 308 c.mu.Unlock() 309 return h, nil 310 } 311 if !block { 312 c.mu.Unlock() 313 return nil, nil 314 } 315 c.consumerWaiting = true 316 c.mu.Unlock() 317 select { 318 case <-c.ch: 319 case <-c.done: 320 c.finish() 321 return nil, ErrConnClosing 322 } 323 } 324} 325 326func (c *controlBuffer) finish() { 327 c.mu.Lock() 328 if c.err != nil { 329 c.mu.Unlock() 330 return 331 } 332 c.err = ErrConnClosing 333 // There may be headers for streams in the control buffer. 334 // These streams need to be cleaned out since the transport 335 // is still not aware of these yet. 336 for head := c.list.dequeueAll(); head != nil; head = head.next { 337 hdr, ok := head.it.(*headerFrame) 338 if !ok { 339 continue 340 } 341 if hdr.onOrphaned != nil { // It will be nil on the server-side. 342 hdr.onOrphaned(ErrConnClosing) 343 } 344 } 345 c.mu.Unlock() 346} 347 348type side int 349 350const ( 351 clientSide side = iota 352 serverSide 353) 354 355// Loopy receives frames from the control buffer. 356// Each frame is handled individually; most of the work done by loopy goes 357// into handling data frames. Loopy maintains a queue of active streams, and each 358// stream maintains a queue of data frames; as loopy receives data frames 359// it gets added to the queue of the relevant stream. 360// Loopy goes over this list of active streams by processing one node every iteration, 361// thereby closely resemebling to a round-robin scheduling over all streams. While 362// processing a stream, loopy writes out data bytes from this stream capped by the min 363// of http2MaxFrameLen, connection-level flow control and stream-level flow control. 364type loopyWriter struct { 365 side side 366 cbuf *controlBuffer 367 sendQuota uint32 368 oiws uint32 // outbound initial window size. 369 // estdStreams is map of all established streams that are not cleaned-up yet. 370 // On client-side, this is all streams whose headers were sent out. 371 // On server-side, this is all streams whose headers were received. 372 estdStreams map[uint32]*outStream // Established streams. 373 // activeStreams is a linked-list of all streams that have data to send and some 374 // stream-level flow control quota. 375 // Each of these streams internally have a list of data items(and perhaps trailers 376 // on the server-side) to be sent out. 377 activeStreams *outStreamList 378 framer *framer 379 hBuf *bytes.Buffer // The buffer for HPACK encoding. 380 hEnc *hpack.Encoder // HPACK encoder. 381 bdpEst *bdpEstimator 382 draining bool 383 384 // Side-specific handlers 385 ssGoAwayHandler func(*goAway) (bool, error) 386} 387 388func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter { 389 var buf bytes.Buffer 390 l := &loopyWriter{ 391 side: s, 392 cbuf: cbuf, 393 sendQuota: defaultWindowSize, 394 oiws: defaultWindowSize, 395 estdStreams: make(map[uint32]*outStream), 396 activeStreams: newOutStreamList(), 397 framer: fr, 398 hBuf: &buf, 399 hEnc: hpack.NewEncoder(&buf), 400 bdpEst: bdpEst, 401 } 402 return l 403} 404 405const minBatchSize = 1000 406 407// run should be run in a separate goroutine. 408// It reads control frames from controlBuf and processes them by: 409// 1. Updating loopy's internal state, or/and 410// 2. Writing out HTTP2 frames on the wire. 411// 412// Loopy keeps all active streams with data to send in a linked-list. 413// All streams in the activeStreams linked-list must have both: 414// 1. Data to send, and 415// 2. Stream level flow control quota available. 416// 417// In each iteration of run loop, other than processing the incoming control 418// frame, loopy calls processData, which processes one node from the activeStreams linked-list. 419// This results in writing of HTTP2 frames into an underlying write buffer. 420// When there's no more control frames to read from controlBuf, loopy flushes the write buffer. 421// As an optimization, to increase the batch size for each flush, loopy yields the processor, once 422// if the batch size is too low to give stream goroutines a chance to fill it up. 423func (l *loopyWriter) run() (err error) { 424 defer func() { 425 if err == ErrConnClosing { 426 // Don't log ErrConnClosing as error since it happens 427 // 1. When the connection is closed by some other known issue. 428 // 2. User closed the connection. 429 // 3. A graceful close of connection. 430 infof("transport: loopyWriter.run returning. %v", err) 431 err = nil 432 } 433 }() 434 for { 435 it, err := l.cbuf.get(true) 436 if err != nil { 437 return err 438 } 439 if err = l.handle(it); err != nil { 440 return err 441 } 442 if _, err = l.processData(); err != nil { 443 return err 444 } 445 gosched := true 446 hasdata: 447 for { 448 it, err := l.cbuf.get(false) 449 if err != nil { 450 return err 451 } 452 if it != nil { 453 if err = l.handle(it); err != nil { 454 return err 455 } 456 if _, err = l.processData(); err != nil { 457 return err 458 } 459 continue hasdata 460 } 461 isEmpty, err := l.processData() 462 if err != nil { 463 return err 464 } 465 if !isEmpty { 466 continue hasdata 467 } 468 if gosched { 469 gosched = false 470 if l.framer.writer.offset < minBatchSize { 471 runtime.Gosched() 472 continue hasdata 473 } 474 } 475 l.framer.writer.Flush() 476 break hasdata 477 478 } 479 } 480} 481 482func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error { 483 return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment) 484} 485 486func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error { 487 // Otherwise update the quota. 488 if w.streamID == 0 { 489 l.sendQuota += w.increment 490 return nil 491 } 492 // Find the stream and update it. 493 if str, ok := l.estdStreams[w.streamID]; ok { 494 str.bytesOutStanding -= int(w.increment) 495 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota { 496 str.state = active 497 l.activeStreams.enqueue(str) 498 return nil 499 } 500 } 501 return nil 502} 503 504func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error { 505 return l.framer.fr.WriteSettings(s.ss...) 506} 507 508func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error { 509 if err := l.applySettings(s.ss); err != nil { 510 return err 511 } 512 return l.framer.fr.WriteSettingsAck() 513} 514 515func (l *loopyWriter) registerStreamHandler(h *registerStream) error { 516 str := &outStream{ 517 id: h.streamID, 518 state: empty, 519 itl: &itemList{}, 520 wq: h.wq, 521 } 522 l.estdStreams[h.streamID] = str 523 return nil 524} 525 526func (l *loopyWriter) headerHandler(h *headerFrame) error { 527 if l.side == serverSide { 528 str, ok := l.estdStreams[h.streamID] 529 if !ok { 530 warningf("transport: loopy doesn't recognize the stream: %d", h.streamID) 531 return nil 532 } 533 // Case 1.A: Server is responding back with headers. 534 if !h.endStream { 535 return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite) 536 } 537 // else: Case 1.B: Server wants to close stream. 538 539 if str.state != empty { // either active or waiting on stream quota. 540 // add it str's list of items. 541 str.itl.enqueue(h) 542 return nil 543 } 544 if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil { 545 return err 546 } 547 return l.cleanupStreamHandler(h.cleanup) 548 } 549 // Case 2: Client wants to originate stream. 550 str := &outStream{ 551 id: h.streamID, 552 state: empty, 553 itl: &itemList{}, 554 wq: h.wq, 555 } 556 str.itl.enqueue(h) 557 return l.originateStream(str) 558} 559 560func (l *loopyWriter) originateStream(str *outStream) error { 561 hdr := str.itl.dequeue().(*headerFrame) 562 sendPing, err := hdr.initStream(str.id) 563 if err != nil { 564 if err == ErrConnClosing { 565 return err 566 } 567 // Other errors(errStreamDrain) need not close transport. 568 return nil 569 } 570 if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil { 571 return err 572 } 573 l.estdStreams[str.id] = str 574 if sendPing { 575 return l.pingHandler(&ping{data: [8]byte{}}) 576 } 577 return nil 578} 579 580func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error { 581 if onWrite != nil { 582 onWrite() 583 } 584 l.hBuf.Reset() 585 for _, f := range hf { 586 if err := l.hEnc.WriteField(f); err != nil { 587 warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err) 588 } 589 } 590 var ( 591 err error 592 endHeaders, first bool 593 ) 594 first = true 595 for !endHeaders { 596 size := l.hBuf.Len() 597 if size > http2MaxFrameLen { 598 size = http2MaxFrameLen 599 } else { 600 endHeaders = true 601 } 602 if first { 603 first = false 604 err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{ 605 StreamID: streamID, 606 BlockFragment: l.hBuf.Next(size), 607 EndStream: endStream, 608 EndHeaders: endHeaders, 609 }) 610 } else { 611 err = l.framer.fr.WriteContinuation( 612 streamID, 613 endHeaders, 614 l.hBuf.Next(size), 615 ) 616 } 617 if err != nil { 618 return err 619 } 620 } 621 return nil 622} 623 624func (l *loopyWriter) preprocessData(df *dataFrame) error { 625 str, ok := l.estdStreams[df.streamID] 626 if !ok { 627 return nil 628 } 629 // If we got data for a stream it means that 630 // stream was originated and the headers were sent out. 631 str.itl.enqueue(df) 632 if str.state == empty { 633 str.state = active 634 l.activeStreams.enqueue(str) 635 } 636 return nil 637} 638 639func (l *loopyWriter) pingHandler(p *ping) error { 640 if !p.ack { 641 l.bdpEst.timesnap(p.data) 642 } 643 return l.framer.fr.WritePing(p.ack, p.data) 644 645} 646 647func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error { 648 o.resp <- l.sendQuota 649 return nil 650} 651 652func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { 653 c.onWrite() 654 if str, ok := l.estdStreams[c.streamID]; ok { 655 // On the server side it could be a trailers-only response or 656 // a RST_STREAM before stream initialization thus the stream might 657 // not be established yet. 658 delete(l.estdStreams, c.streamID) 659 str.deleteSelf() 660 } 661 if c.rst { // If RST_STREAM needs to be sent. 662 if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil { 663 return err 664 } 665 } 666 if l.side == clientSide && l.draining && len(l.estdStreams) == 0 { 667 return ErrConnClosing 668 } 669 return nil 670} 671 672func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error { 673 if l.side == clientSide { 674 l.draining = true 675 if len(l.estdStreams) == 0 { 676 return ErrConnClosing 677 } 678 } 679 return nil 680} 681 682func (l *loopyWriter) goAwayHandler(g *goAway) error { 683 // Handling of outgoing GoAway is very specific to side. 684 if l.ssGoAwayHandler != nil { 685 draining, err := l.ssGoAwayHandler(g) 686 if err != nil { 687 return err 688 } 689 l.draining = draining 690 } 691 return nil 692} 693 694func (l *loopyWriter) handle(i interface{}) error { 695 switch i := i.(type) { 696 case *incomingWindowUpdate: 697 return l.incomingWindowUpdateHandler(i) 698 case *outgoingWindowUpdate: 699 return l.outgoingWindowUpdateHandler(i) 700 case *incomingSettings: 701 return l.incomingSettingsHandler(i) 702 case *outgoingSettings: 703 return l.outgoingSettingsHandler(i) 704 case *headerFrame: 705 return l.headerHandler(i) 706 case *registerStream: 707 return l.registerStreamHandler(i) 708 case *cleanupStream: 709 return l.cleanupStreamHandler(i) 710 case *incomingGoAway: 711 return l.incomingGoAwayHandler(i) 712 case *dataFrame: 713 return l.preprocessData(i) 714 case *ping: 715 return l.pingHandler(i) 716 case *goAway: 717 return l.goAwayHandler(i) 718 case *outFlowControlSizeRequest: 719 return l.outFlowControlSizeRequestHandler(i) 720 default: 721 return fmt.Errorf("transport: unknown control message type %T", i) 722 } 723} 724 725func (l *loopyWriter) applySettings(ss []http2.Setting) error { 726 for _, s := range ss { 727 switch s.ID { 728 case http2.SettingInitialWindowSize: 729 o := l.oiws 730 l.oiws = s.Val 731 if o < l.oiws { 732 // If the new limit is greater make all depleted streams active. 733 for _, stream := range l.estdStreams { 734 if stream.state == waitingOnStreamQuota { 735 stream.state = active 736 l.activeStreams.enqueue(stream) 737 } 738 } 739 } 740 case http2.SettingHeaderTableSize: 741 updateHeaderTblSize(l.hEnc, s.Val) 742 } 743 } 744 return nil 745} 746 747// processData removes the first stream from active streams, writes out at most 16KB 748// of its data and then puts it at the end of activeStreams if there's still more data 749// to be sent and stream has some stream-level flow control. 750func (l *loopyWriter) processData() (bool, error) { 751 if l.sendQuota == 0 { 752 return true, nil 753 } 754 str := l.activeStreams.dequeue() // Remove the first stream. 755 if str == nil { 756 return true, nil 757 } 758 dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream. 759 // A data item is represented by a dataFrame, since it later translates into 760 // multiple HTTP2 data frames. 761 // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data. 762 // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the 763 // maximum possilbe HTTP2 frame size. 764 765 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame 766 // Client sends out empty data frame with endStream = true 767 if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil { 768 return false, err 769 } 770 str.itl.dequeue() // remove the empty data item from stream 771 if str.itl.isEmpty() { 772 str.state = empty 773 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers. 774 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { 775 return false, err 776 } 777 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { 778 return false, nil 779 } 780 } else { 781 l.activeStreams.enqueue(str) 782 } 783 return false, nil 784 } 785 var ( 786 idx int 787 buf []byte 788 ) 789 if len(dataItem.h) != 0 { // data header has not been written out yet. 790 buf = dataItem.h 791 } else { 792 idx = 1 793 buf = dataItem.d 794 } 795 size := http2MaxFrameLen 796 if len(buf) < size { 797 size = len(buf) 798 } 799 if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control. 800 str.state = waitingOnStreamQuota 801 return false, nil 802 } else if strQuota < size { 803 size = strQuota 804 } 805 806 if l.sendQuota < uint32(size) { // connection-level flow control. 807 size = int(l.sendQuota) 808 } 809 // Now that outgoing flow controls are checked we can replenish str's write quota 810 str.wq.replenish(size) 811 var endStream bool 812 // If this is the last data message on this stream and all of it can be written in this iteration. 813 if dataItem.endStream && size == len(buf) { 814 // buf contains either data or it contains header but data is empty. 815 if idx == 1 || len(dataItem.d) == 0 { 816 endStream = true 817 } 818 } 819 if dataItem.onEachWrite != nil { 820 dataItem.onEachWrite() 821 } 822 if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil { 823 return false, err 824 } 825 buf = buf[size:] 826 str.bytesOutStanding += size 827 l.sendQuota -= uint32(size) 828 if idx == 0 { 829 dataItem.h = buf 830 } else { 831 dataItem.d = buf 832 } 833 834 if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out. 835 str.itl.dequeue() 836 } 837 if str.itl.isEmpty() { 838 str.state = empty 839 } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers. 840 if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { 841 return false, err 842 } 843 if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { 844 return false, err 845 } 846 } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota. 847 str.state = waitingOnStreamQuota 848 } else { // Otherwise add it back to the list of active streams. 849 l.activeStreams.enqueue(str) 850 } 851 return false, nil 852} 853