1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"fmt"
24	"runtime"
25	"sync"
26
27	"golang.org/x/net/http2"
28	"golang.org/x/net/http2/hpack"
29)
30
31var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
32	e.SetMaxDynamicTableSizeLimit(v)
33}
34
35type itemNode struct {
36	it   interface{}
37	next *itemNode
38}
39
40type itemList struct {
41	head *itemNode
42	tail *itemNode
43}
44
45func (il *itemList) enqueue(i interface{}) {
46	n := &itemNode{it: i}
47	if il.tail == nil {
48		il.head, il.tail = n, n
49		return
50	}
51	il.tail.next = n
52	il.tail = n
53}
54
55// peek returns the first item in the list without removing it from the
56// list.
57func (il *itemList) peek() interface{} {
58	return il.head.it
59}
60
61func (il *itemList) dequeue() interface{} {
62	if il.head == nil {
63		return nil
64	}
65	i := il.head.it
66	il.head = il.head.next
67	if il.head == nil {
68		il.tail = nil
69	}
70	return i
71}
72
73func (il *itemList) dequeueAll() *itemNode {
74	h := il.head
75	il.head, il.tail = nil, nil
76	return h
77}
78
79func (il *itemList) isEmpty() bool {
80	return il.head == nil
81}
82
83// The following defines various control items which could flow through
84// the control buffer of transport. They represent different aspects of
85// control tasks, e.g., flow control, settings, streaming resetting, etc.
86
87// registerStream is used to register an incoming stream with loopy writer.
88type registerStream struct {
89	streamID uint32
90	wq       *writeQuota
91}
92
93// headerFrame is also used to register stream on the client-side.
94type headerFrame struct {
95	streamID   uint32
96	hf         []hpack.HeaderField
97	endStream  bool                       // Valid on server side.
98	initStream func(uint32) (bool, error) // Used only on the client side.
99	onWrite    func()
100	wq         *writeQuota    // write quota for the stream created.
101	cleanup    *cleanupStream // Valid on the server side.
102	onOrphaned func(error)    // Valid on client-side
103}
104
105type cleanupStream struct {
106	streamID uint32
107	rst      bool
108	rstCode  http2.ErrCode
109	onWrite  func()
110}
111
112type dataFrame struct {
113	streamID  uint32
114	endStream bool
115	h         []byte
116	d         []byte
117	// onEachWrite is called every time
118	// a part of d is written out.
119	onEachWrite func()
120}
121
122type incomingWindowUpdate struct {
123	streamID  uint32
124	increment uint32
125}
126
127type outgoingWindowUpdate struct {
128	streamID  uint32
129	increment uint32
130}
131
132type incomingSettings struct {
133	ss []http2.Setting
134}
135
136type outgoingSettings struct {
137	ss []http2.Setting
138}
139
140type incomingGoAway struct {
141}
142
143type goAway struct {
144	code      http2.ErrCode
145	debugData []byte
146	headsUp   bool
147	closeConn bool
148}
149
150type ping struct {
151	ack  bool
152	data [8]byte
153}
154
155type outFlowControlSizeRequest struct {
156	resp chan uint32
157}
158
159type outStreamState int
160
161const (
162	active outStreamState = iota
163	empty
164	waitingOnStreamQuota
165)
166
167type outStream struct {
168	id               uint32
169	state            outStreamState
170	itl              *itemList
171	bytesOutStanding int
172	wq               *writeQuota
173
174	next *outStream
175	prev *outStream
176}
177
178func (s *outStream) deleteSelf() {
179	if s.prev != nil {
180		s.prev.next = s.next
181	}
182	if s.next != nil {
183		s.next.prev = s.prev
184	}
185	s.next, s.prev = nil, nil
186}
187
188type outStreamList struct {
189	// Following are sentinel objects that mark the
190	// beginning and end of the list. They do not
191	// contain any item lists. All valid objects are
192	// inserted in between them.
193	// This is needed so that an outStream object can
194	// deleteSelf() in O(1) time without knowing which
195	// list it belongs to.
196	head *outStream
197	tail *outStream
198}
199
200func newOutStreamList() *outStreamList {
201	head, tail := new(outStream), new(outStream)
202	head.next = tail
203	tail.prev = head
204	return &outStreamList{
205		head: head,
206		tail: tail,
207	}
208}
209
210func (l *outStreamList) enqueue(s *outStream) {
211	e := l.tail.prev
212	e.next = s
213	s.prev = e
214	s.next = l.tail
215	l.tail.prev = s
216}
217
218// remove from the beginning of the list.
219func (l *outStreamList) dequeue() *outStream {
220	b := l.head.next
221	if b == l.tail {
222		return nil
223	}
224	b.deleteSelf()
225	return b
226}
227
228// controlBuffer is a way to pass information to loopy.
229// Information is passed as specific struct types called control frames.
230// A control frame not only represents data, messages or headers to be sent out
231// but can also be used to instruct loopy to update its internal state.
232// It shouldn't be confused with an HTTP2 frame, although some of the control frames
233// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
234type controlBuffer struct {
235	ch              chan struct{}
236	done            <-chan struct{}
237	mu              sync.Mutex
238	consumerWaiting bool
239	list            *itemList
240	err             error
241}
242
243func newControlBuffer(done <-chan struct{}) *controlBuffer {
244	return &controlBuffer{
245		ch:   make(chan struct{}, 1),
246		list: &itemList{},
247		done: done,
248	}
249}
250
251func (c *controlBuffer) put(it interface{}) error {
252	_, err := c.executeAndPut(nil, it)
253	return err
254}
255
256func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
257	var wakeUp bool
258	c.mu.Lock()
259	if c.err != nil {
260		c.mu.Unlock()
261		return false, c.err
262	}
263	if f != nil {
264		if !f(it) { // f wasn't successful
265			c.mu.Unlock()
266			return false, nil
267		}
268	}
269	if c.consumerWaiting {
270		wakeUp = true
271		c.consumerWaiting = false
272	}
273	c.list.enqueue(it)
274	c.mu.Unlock()
275	if wakeUp {
276		select {
277		case c.ch <- struct{}{}:
278		default:
279		}
280	}
281	return true, nil
282}
283
284// Note argument f should never be nil.
285func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
286	c.mu.Lock()
287	if c.err != nil {
288		c.mu.Unlock()
289		return false, c.err
290	}
291	if !f(it) { // f wasn't successful
292		c.mu.Unlock()
293		return false, nil
294	}
295	c.mu.Unlock()
296	return true, nil
297}
298
299func (c *controlBuffer) get(block bool) (interface{}, error) {
300	for {
301		c.mu.Lock()
302		if c.err != nil {
303			c.mu.Unlock()
304			return nil, c.err
305		}
306		if !c.list.isEmpty() {
307			h := c.list.dequeue()
308			c.mu.Unlock()
309			return h, nil
310		}
311		if !block {
312			c.mu.Unlock()
313			return nil, nil
314		}
315		c.consumerWaiting = true
316		c.mu.Unlock()
317		select {
318		case <-c.ch:
319		case <-c.done:
320			c.finish()
321			return nil, ErrConnClosing
322		}
323	}
324}
325
326func (c *controlBuffer) finish() {
327	c.mu.Lock()
328	if c.err != nil {
329		c.mu.Unlock()
330		return
331	}
332	c.err = ErrConnClosing
333	// There may be headers for streams in the control buffer.
334	// These streams need to be cleaned out since the transport
335	// is still not aware of these yet.
336	for head := c.list.dequeueAll(); head != nil; head = head.next {
337		hdr, ok := head.it.(*headerFrame)
338		if !ok {
339			continue
340		}
341		if hdr.onOrphaned != nil { // It will be nil on the server-side.
342			hdr.onOrphaned(ErrConnClosing)
343		}
344	}
345	c.mu.Unlock()
346}
347
348type side int
349
350const (
351	clientSide side = iota
352	serverSide
353)
354
355// Loopy receives frames from the control buffer.
356// Each frame is handled individually; most of the work done by loopy goes
357// into handling data frames. Loopy maintains a queue of active streams, and each
358// stream maintains a queue of data frames; as loopy receives data frames
359// it gets added to the queue of the relevant stream.
360// Loopy goes over this list of active streams by processing one node every iteration,
361// thereby closely resemebling to a round-robin scheduling over all streams. While
362// processing a stream, loopy writes out data bytes from this stream capped by the min
363// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
364type loopyWriter struct {
365	side      side
366	cbuf      *controlBuffer
367	sendQuota uint32
368	oiws      uint32 // outbound initial window size.
369	// estdStreams is map of all established streams that are not cleaned-up yet.
370	// On client-side, this is all streams whose headers were sent out.
371	// On server-side, this is all streams whose headers were received.
372	estdStreams map[uint32]*outStream // Established streams.
373	// activeStreams is a linked-list of all streams that have data to send and some
374	// stream-level flow control quota.
375	// Each of these streams internally have a list of data items(and perhaps trailers
376	// on the server-side) to be sent out.
377	activeStreams *outStreamList
378	framer        *framer
379	hBuf          *bytes.Buffer  // The buffer for HPACK encoding.
380	hEnc          *hpack.Encoder // HPACK encoder.
381	bdpEst        *bdpEstimator
382	draining      bool
383
384	// Side-specific handlers
385	ssGoAwayHandler func(*goAway) (bool, error)
386}
387
388func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
389	var buf bytes.Buffer
390	l := &loopyWriter{
391		side:          s,
392		cbuf:          cbuf,
393		sendQuota:     defaultWindowSize,
394		oiws:          defaultWindowSize,
395		estdStreams:   make(map[uint32]*outStream),
396		activeStreams: newOutStreamList(),
397		framer:        fr,
398		hBuf:          &buf,
399		hEnc:          hpack.NewEncoder(&buf),
400		bdpEst:        bdpEst,
401	}
402	return l
403}
404
405const minBatchSize = 1000
406
407// run should be run in a separate goroutine.
408// It reads control frames from controlBuf and processes them by:
409// 1. Updating loopy's internal state, or/and
410// 2. Writing out HTTP2 frames on the wire.
411//
412// Loopy keeps all active streams with data to send in a linked-list.
413// All streams in the activeStreams linked-list must have both:
414// 1. Data to send, and
415// 2. Stream level flow control quota available.
416//
417// In each iteration of run loop, other than processing the incoming control
418// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
419// This results in writing of HTTP2 frames into an underlying write buffer.
420// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
421// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
422// if the batch size is too low to give stream goroutines a chance to fill it up.
423func (l *loopyWriter) run() (err error) {
424	defer func() {
425		if err == ErrConnClosing {
426			// Don't log ErrConnClosing as error since it happens
427			// 1. When the connection is closed by some other known issue.
428			// 2. User closed the connection.
429			// 3. A graceful close of connection.
430			infof("transport: loopyWriter.run returning. %v", err)
431			err = nil
432		}
433	}()
434	for {
435		it, err := l.cbuf.get(true)
436		if err != nil {
437			return err
438		}
439		if err = l.handle(it); err != nil {
440			return err
441		}
442		if _, err = l.processData(); err != nil {
443			return err
444		}
445		gosched := true
446	hasdata:
447		for {
448			it, err := l.cbuf.get(false)
449			if err != nil {
450				return err
451			}
452			if it != nil {
453				if err = l.handle(it); err != nil {
454					return err
455				}
456				if _, err = l.processData(); err != nil {
457					return err
458				}
459				continue hasdata
460			}
461			isEmpty, err := l.processData()
462			if err != nil {
463				return err
464			}
465			if !isEmpty {
466				continue hasdata
467			}
468			if gosched {
469				gosched = false
470				if l.framer.writer.offset < minBatchSize {
471					runtime.Gosched()
472					continue hasdata
473				}
474			}
475			l.framer.writer.Flush()
476			break hasdata
477
478		}
479	}
480}
481
482func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
483	return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
484}
485
486func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
487	// Otherwise update the quota.
488	if w.streamID == 0 {
489		l.sendQuota += w.increment
490		return nil
491	}
492	// Find the stream and update it.
493	if str, ok := l.estdStreams[w.streamID]; ok {
494		str.bytesOutStanding -= int(w.increment)
495		if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
496			str.state = active
497			l.activeStreams.enqueue(str)
498			return nil
499		}
500	}
501	return nil
502}
503
504func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
505	return l.framer.fr.WriteSettings(s.ss...)
506}
507
508func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
509	if err := l.applySettings(s.ss); err != nil {
510		return err
511	}
512	return l.framer.fr.WriteSettingsAck()
513}
514
515func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
516	str := &outStream{
517		id:    h.streamID,
518		state: empty,
519		itl:   &itemList{},
520		wq:    h.wq,
521	}
522	l.estdStreams[h.streamID] = str
523	return nil
524}
525
526func (l *loopyWriter) headerHandler(h *headerFrame) error {
527	if l.side == serverSide {
528		str, ok := l.estdStreams[h.streamID]
529		if !ok {
530			warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
531			return nil
532		}
533		// Case 1.A: Server is responding back with headers.
534		if !h.endStream {
535			return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
536		}
537		// else:  Case 1.B: Server wants to close stream.
538
539		if str.state != empty { // either active or waiting on stream quota.
540			// add it str's list of items.
541			str.itl.enqueue(h)
542			return nil
543		}
544		if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
545			return err
546		}
547		return l.cleanupStreamHandler(h.cleanup)
548	}
549	// Case 2: Client wants to originate stream.
550	str := &outStream{
551		id:    h.streamID,
552		state: empty,
553		itl:   &itemList{},
554		wq:    h.wq,
555	}
556	str.itl.enqueue(h)
557	return l.originateStream(str)
558}
559
560func (l *loopyWriter) originateStream(str *outStream) error {
561	hdr := str.itl.dequeue().(*headerFrame)
562	sendPing, err := hdr.initStream(str.id)
563	if err != nil {
564		if err == ErrConnClosing {
565			return err
566		}
567		// Other errors(errStreamDrain) need not close transport.
568		return nil
569	}
570	if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
571		return err
572	}
573	l.estdStreams[str.id] = str
574	if sendPing {
575		return l.pingHandler(&ping{data: [8]byte{}})
576	}
577	return nil
578}
579
580func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
581	if onWrite != nil {
582		onWrite()
583	}
584	l.hBuf.Reset()
585	for _, f := range hf {
586		if err := l.hEnc.WriteField(f); err != nil {
587			warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err)
588		}
589	}
590	var (
591		err               error
592		endHeaders, first bool
593	)
594	first = true
595	for !endHeaders {
596		size := l.hBuf.Len()
597		if size > http2MaxFrameLen {
598			size = http2MaxFrameLen
599		} else {
600			endHeaders = true
601		}
602		if first {
603			first = false
604			err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
605				StreamID:      streamID,
606				BlockFragment: l.hBuf.Next(size),
607				EndStream:     endStream,
608				EndHeaders:    endHeaders,
609			})
610		} else {
611			err = l.framer.fr.WriteContinuation(
612				streamID,
613				endHeaders,
614				l.hBuf.Next(size),
615			)
616		}
617		if err != nil {
618			return err
619		}
620	}
621	return nil
622}
623
624func (l *loopyWriter) preprocessData(df *dataFrame) error {
625	str, ok := l.estdStreams[df.streamID]
626	if !ok {
627		return nil
628	}
629	// If we got data for a stream it means that
630	// stream was originated and the headers were sent out.
631	str.itl.enqueue(df)
632	if str.state == empty {
633		str.state = active
634		l.activeStreams.enqueue(str)
635	}
636	return nil
637}
638
639func (l *loopyWriter) pingHandler(p *ping) error {
640	if !p.ack {
641		l.bdpEst.timesnap(p.data)
642	}
643	return l.framer.fr.WritePing(p.ack, p.data)
644
645}
646
647func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
648	o.resp <- l.sendQuota
649	return nil
650}
651
652func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
653	c.onWrite()
654	if str, ok := l.estdStreams[c.streamID]; ok {
655		// On the server side it could be a trailers-only response or
656		// a RST_STREAM before stream initialization thus the stream might
657		// not be established yet.
658		delete(l.estdStreams, c.streamID)
659		str.deleteSelf()
660	}
661	if c.rst { // If RST_STREAM needs to be sent.
662		if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
663			return err
664		}
665	}
666	if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
667		return ErrConnClosing
668	}
669	return nil
670}
671
672func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
673	if l.side == clientSide {
674		l.draining = true
675		if len(l.estdStreams) == 0 {
676			return ErrConnClosing
677		}
678	}
679	return nil
680}
681
682func (l *loopyWriter) goAwayHandler(g *goAway) error {
683	// Handling of outgoing GoAway is very specific to side.
684	if l.ssGoAwayHandler != nil {
685		draining, err := l.ssGoAwayHandler(g)
686		if err != nil {
687			return err
688		}
689		l.draining = draining
690	}
691	return nil
692}
693
694func (l *loopyWriter) handle(i interface{}) error {
695	switch i := i.(type) {
696	case *incomingWindowUpdate:
697		return l.incomingWindowUpdateHandler(i)
698	case *outgoingWindowUpdate:
699		return l.outgoingWindowUpdateHandler(i)
700	case *incomingSettings:
701		return l.incomingSettingsHandler(i)
702	case *outgoingSettings:
703		return l.outgoingSettingsHandler(i)
704	case *headerFrame:
705		return l.headerHandler(i)
706	case *registerStream:
707		return l.registerStreamHandler(i)
708	case *cleanupStream:
709		return l.cleanupStreamHandler(i)
710	case *incomingGoAway:
711		return l.incomingGoAwayHandler(i)
712	case *dataFrame:
713		return l.preprocessData(i)
714	case *ping:
715		return l.pingHandler(i)
716	case *goAway:
717		return l.goAwayHandler(i)
718	case *outFlowControlSizeRequest:
719		return l.outFlowControlSizeRequestHandler(i)
720	default:
721		return fmt.Errorf("transport: unknown control message type %T", i)
722	}
723}
724
725func (l *loopyWriter) applySettings(ss []http2.Setting) error {
726	for _, s := range ss {
727		switch s.ID {
728		case http2.SettingInitialWindowSize:
729			o := l.oiws
730			l.oiws = s.Val
731			if o < l.oiws {
732				// If the new limit is greater make all depleted streams active.
733				for _, stream := range l.estdStreams {
734					if stream.state == waitingOnStreamQuota {
735						stream.state = active
736						l.activeStreams.enqueue(stream)
737					}
738				}
739			}
740		case http2.SettingHeaderTableSize:
741			updateHeaderTblSize(l.hEnc, s.Val)
742		}
743	}
744	return nil
745}
746
747// processData removes the first stream from active streams, writes out at most 16KB
748// of its data and then puts it at the end of activeStreams if there's still more data
749// to be sent and stream has some stream-level flow control.
750func (l *loopyWriter) processData() (bool, error) {
751	if l.sendQuota == 0 {
752		return true, nil
753	}
754	str := l.activeStreams.dequeue() // Remove the first stream.
755	if str == nil {
756		return true, nil
757	}
758	dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
759	// A data item is represented by a dataFrame, since it later translates into
760	// multiple HTTP2 data frames.
761	// Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
762	// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
763	// maximum possilbe HTTP2 frame size.
764
765	if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
766		// Client sends out empty data frame with endStream = true
767		if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
768			return false, err
769		}
770		str.itl.dequeue() // remove the empty data item from stream
771		if str.itl.isEmpty() {
772			str.state = empty
773		} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
774			if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
775				return false, err
776			}
777			if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
778				return false, nil
779			}
780		} else {
781			l.activeStreams.enqueue(str)
782		}
783		return false, nil
784	}
785	var (
786		idx int
787		buf []byte
788	)
789	if len(dataItem.h) != 0 { // data header has not been written out yet.
790		buf = dataItem.h
791	} else {
792		idx = 1
793		buf = dataItem.d
794	}
795	size := http2MaxFrameLen
796	if len(buf) < size {
797		size = len(buf)
798	}
799	if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
800		str.state = waitingOnStreamQuota
801		return false, nil
802	} else if strQuota < size {
803		size = strQuota
804	}
805
806	if l.sendQuota < uint32(size) { // connection-level flow control.
807		size = int(l.sendQuota)
808	}
809	// Now that outgoing flow controls are checked we can replenish str's write quota
810	str.wq.replenish(size)
811	var endStream bool
812	// If this is the last data message on this stream and all of it can be written in this iteration.
813	if dataItem.endStream && size == len(buf) {
814		// buf contains either data or it contains header but data is empty.
815		if idx == 1 || len(dataItem.d) == 0 {
816			endStream = true
817		}
818	}
819	if dataItem.onEachWrite != nil {
820		dataItem.onEachWrite()
821	}
822	if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
823		return false, err
824	}
825	buf = buf[size:]
826	str.bytesOutStanding += size
827	l.sendQuota -= uint32(size)
828	if idx == 0 {
829		dataItem.h = buf
830	} else {
831		dataItem.d = buf
832	}
833
834	if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
835		str.itl.dequeue()
836	}
837	if str.itl.isEmpty() {
838		str.state = empty
839	} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
840		if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
841			return false, err
842		}
843		if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
844			return false, err
845		}
846	} else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
847		str.state = waitingOnStreamQuota
848	} else { // Otherwise add it back to the list of active streams.
849		l.activeStreams.enqueue(str)
850	}
851	return false, nil
852}
853