1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"fmt"
24	"runtime"
25	"sync"
26	"sync/atomic"
27
28	"golang.org/x/net/http2"
29	"golang.org/x/net/http2/hpack"
30)
31
32var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
33	e.SetMaxDynamicTableSizeLimit(v)
34}
35
36type itemNode struct {
37	it   interface{}
38	next *itemNode
39}
40
41type itemList struct {
42	head *itemNode
43	tail *itemNode
44}
45
46func (il *itemList) enqueue(i interface{}) {
47	n := &itemNode{it: i}
48	if il.tail == nil {
49		il.head, il.tail = n, n
50		return
51	}
52	il.tail.next = n
53	il.tail = n
54}
55
56// peek returns the first item in the list without removing it from the
57// list.
58func (il *itemList) peek() interface{} {
59	return il.head.it
60}
61
62func (il *itemList) dequeue() interface{} {
63	if il.head == nil {
64		return nil
65	}
66	i := il.head.it
67	il.head = il.head.next
68	if il.head == nil {
69		il.tail = nil
70	}
71	return i
72}
73
74func (il *itemList) dequeueAll() *itemNode {
75	h := il.head
76	il.head, il.tail = nil, nil
77	return h
78}
79
80func (il *itemList) isEmpty() bool {
81	return il.head == nil
82}
83
84// The following defines various control items which could flow through
85// the control buffer of transport. They represent different aspects of
86// control tasks, e.g., flow control, settings, streaming resetting, etc.
87
88// maxQueuedTransportResponseFrames is the most queued "transport response"
89// frames we will buffer before preventing new reads from occurring on the
90// transport.  These are control frames sent in response to client requests,
91// such as RST_STREAM due to bad headers or settings acks.
92const maxQueuedTransportResponseFrames = 50
93
94type cbItem interface {
95	isTransportResponseFrame() bool
96}
97
98// registerStream is used to register an incoming stream with loopy writer.
99type registerStream struct {
100	streamID uint32
101	wq       *writeQuota
102}
103
104func (*registerStream) isTransportResponseFrame() bool { return false }
105
106// headerFrame is also used to register stream on the client-side.
107type headerFrame struct {
108	streamID   uint32
109	hf         []hpack.HeaderField
110	endStream  bool               // Valid on server side.
111	initStream func(uint32) error // Used only on the client side.
112	onWrite    func()
113	wq         *writeQuota    // write quota for the stream created.
114	cleanup    *cleanupStream // Valid on the server side.
115	onOrphaned func(error)    // Valid on client-side
116}
117
118func (h *headerFrame) isTransportResponseFrame() bool {
119	return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
120}
121
122type cleanupStream struct {
123	streamID uint32
124	rst      bool
125	rstCode  http2.ErrCode
126	onWrite  func()
127}
128
129func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
130
131type dataFrame struct {
132	streamID  uint32
133	endStream bool
134	h         []byte
135	d         []byte
136	// onEachWrite is called every time
137	// a part of d is written out.
138	onEachWrite func()
139}
140
141func (*dataFrame) isTransportResponseFrame() bool { return false }
142
143type incomingWindowUpdate struct {
144	streamID  uint32
145	increment uint32
146}
147
148func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
149
150type outgoingWindowUpdate struct {
151	streamID  uint32
152	increment uint32
153}
154
155func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
156	return false // window updates are throttled by thresholds
157}
158
159type incomingSettings struct {
160	ss []http2.Setting
161}
162
163func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
164
165type outgoingSettings struct {
166	ss []http2.Setting
167}
168
169func (*outgoingSettings) isTransportResponseFrame() bool { return false }
170
171type incomingGoAway struct {
172}
173
174func (*incomingGoAway) isTransportResponseFrame() bool { return false }
175
176type goAway struct {
177	code      http2.ErrCode
178	debugData []byte
179	headsUp   bool
180	closeConn bool
181}
182
183func (*goAway) isTransportResponseFrame() bool { return false }
184
185type ping struct {
186	ack  bool
187	data [8]byte
188}
189
190func (*ping) isTransportResponseFrame() bool { return true }
191
192type outFlowControlSizeRequest struct {
193	resp chan uint32
194}
195
196func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
197
198type outStreamState int
199
200const (
201	active outStreamState = iota
202	empty
203	waitingOnStreamQuota
204)
205
206type outStream struct {
207	id               uint32
208	state            outStreamState
209	itl              *itemList
210	bytesOutStanding int
211	wq               *writeQuota
212
213	next *outStream
214	prev *outStream
215}
216
217func (s *outStream) deleteSelf() {
218	if s.prev != nil {
219		s.prev.next = s.next
220	}
221	if s.next != nil {
222		s.next.prev = s.prev
223	}
224	s.next, s.prev = nil, nil
225}
226
227type outStreamList struct {
228	// Following are sentinel objects that mark the
229	// beginning and end of the list. They do not
230	// contain any item lists. All valid objects are
231	// inserted in between them.
232	// This is needed so that an outStream object can
233	// deleteSelf() in O(1) time without knowing which
234	// list it belongs to.
235	head *outStream
236	tail *outStream
237}
238
239func newOutStreamList() *outStreamList {
240	head, tail := new(outStream), new(outStream)
241	head.next = tail
242	tail.prev = head
243	return &outStreamList{
244		head: head,
245		tail: tail,
246	}
247}
248
249func (l *outStreamList) enqueue(s *outStream) {
250	e := l.tail.prev
251	e.next = s
252	s.prev = e
253	s.next = l.tail
254	l.tail.prev = s
255}
256
257// remove from the beginning of the list.
258func (l *outStreamList) dequeue() *outStream {
259	b := l.head.next
260	if b == l.tail {
261		return nil
262	}
263	b.deleteSelf()
264	return b
265}
266
267// controlBuffer is a way to pass information to loopy.
268// Information is passed as specific struct types called control frames.
269// A control frame not only represents data, messages or headers to be sent out
270// but can also be used to instruct loopy to update its internal state.
271// It shouldn't be confused with an HTTP2 frame, although some of the control frames
272// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
273type controlBuffer struct {
274	ch              chan struct{}
275	done            <-chan struct{}
276	mu              sync.Mutex
277	consumerWaiting bool
278	list            *itemList
279	err             error
280
281	// transportResponseFrames counts the number of queued items that represent
282	// the response of an action initiated by the peer.  trfChan is created
283	// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
284	// closed and nilled when transportResponseFrames drops below the
285	// threshold.  Both fields are protected by mu.
286	transportResponseFrames int
287	trfChan                 atomic.Value // *chan struct{}
288}
289
290func newControlBuffer(done <-chan struct{}) *controlBuffer {
291	return &controlBuffer{
292		ch:   make(chan struct{}, 1),
293		list: &itemList{},
294		done: done,
295	}
296}
297
298// throttle blocks if there are too many incomingSettings/cleanupStreams in the
299// controlbuf.
300func (c *controlBuffer) throttle() {
301	ch, _ := c.trfChan.Load().(*chan struct{})
302	if ch != nil {
303		select {
304		case <-*ch:
305		case <-c.done:
306		}
307	}
308}
309
310func (c *controlBuffer) put(it cbItem) error {
311	_, err := c.executeAndPut(nil, it)
312	return err
313}
314
315func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
316	var wakeUp bool
317	c.mu.Lock()
318	if c.err != nil {
319		c.mu.Unlock()
320		return false, c.err
321	}
322	if f != nil {
323		if !f(it) { // f wasn't successful
324			c.mu.Unlock()
325			return false, nil
326		}
327	}
328	if c.consumerWaiting {
329		wakeUp = true
330		c.consumerWaiting = false
331	}
332	c.list.enqueue(it)
333	if it.isTransportResponseFrame() {
334		c.transportResponseFrames++
335		if c.transportResponseFrames == maxQueuedTransportResponseFrames {
336			// We are adding the frame that puts us over the threshold; create
337			// a throttling channel.
338			ch := make(chan struct{})
339			c.trfChan.Store(&ch)
340		}
341	}
342	c.mu.Unlock()
343	if wakeUp {
344		select {
345		case c.ch <- struct{}{}:
346		default:
347		}
348	}
349	return true, nil
350}
351
352// Note argument f should never be nil.
353func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
354	c.mu.Lock()
355	if c.err != nil {
356		c.mu.Unlock()
357		return false, c.err
358	}
359	if !f(it) { // f wasn't successful
360		c.mu.Unlock()
361		return false, nil
362	}
363	c.mu.Unlock()
364	return true, nil
365}
366
367func (c *controlBuffer) get(block bool) (interface{}, error) {
368	for {
369		c.mu.Lock()
370		if c.err != nil {
371			c.mu.Unlock()
372			return nil, c.err
373		}
374		if !c.list.isEmpty() {
375			h := c.list.dequeue().(cbItem)
376			if h.isTransportResponseFrame() {
377				if c.transportResponseFrames == maxQueuedTransportResponseFrames {
378					// We are removing the frame that put us over the
379					// threshold; close and clear the throttling channel.
380					ch := c.trfChan.Load().(*chan struct{})
381					close(*ch)
382					c.trfChan.Store((*chan struct{})(nil))
383				}
384				c.transportResponseFrames--
385			}
386			c.mu.Unlock()
387			return h, nil
388		}
389		if !block {
390			c.mu.Unlock()
391			return nil, nil
392		}
393		c.consumerWaiting = true
394		c.mu.Unlock()
395		select {
396		case <-c.ch:
397		case <-c.done:
398			c.finish()
399			return nil, ErrConnClosing
400		}
401	}
402}
403
404func (c *controlBuffer) finish() {
405	c.mu.Lock()
406	if c.err != nil {
407		c.mu.Unlock()
408		return
409	}
410	c.err = ErrConnClosing
411	// There may be headers for streams in the control buffer.
412	// These streams need to be cleaned out since the transport
413	// is still not aware of these yet.
414	for head := c.list.dequeueAll(); head != nil; head = head.next {
415		hdr, ok := head.it.(*headerFrame)
416		if !ok {
417			continue
418		}
419		if hdr.onOrphaned != nil { // It will be nil on the server-side.
420			hdr.onOrphaned(ErrConnClosing)
421		}
422	}
423	c.mu.Unlock()
424}
425
426type side int
427
428const (
429	clientSide side = iota
430	serverSide
431)
432
433// Loopy receives frames from the control buffer.
434// Each frame is handled individually; most of the work done by loopy goes
435// into handling data frames. Loopy maintains a queue of active streams, and each
436// stream maintains a queue of data frames; as loopy receives data frames
437// it gets added to the queue of the relevant stream.
438// Loopy goes over this list of active streams by processing one node every iteration,
439// thereby closely resemebling to a round-robin scheduling over all streams. While
440// processing a stream, loopy writes out data bytes from this stream capped by the min
441// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
442type loopyWriter struct {
443	side      side
444	cbuf      *controlBuffer
445	sendQuota uint32
446	oiws      uint32 // outbound initial window size.
447	// estdStreams is map of all established streams that are not cleaned-up yet.
448	// On client-side, this is all streams whose headers were sent out.
449	// On server-side, this is all streams whose headers were received.
450	estdStreams map[uint32]*outStream // Established streams.
451	// activeStreams is a linked-list of all streams that have data to send and some
452	// stream-level flow control quota.
453	// Each of these streams internally have a list of data items(and perhaps trailers
454	// on the server-side) to be sent out.
455	activeStreams *outStreamList
456	framer        *framer
457	hBuf          *bytes.Buffer  // The buffer for HPACK encoding.
458	hEnc          *hpack.Encoder // HPACK encoder.
459	bdpEst        *bdpEstimator
460	draining      bool
461
462	// Side-specific handlers
463	ssGoAwayHandler func(*goAway) (bool, error)
464}
465
466func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
467	var buf bytes.Buffer
468	l := &loopyWriter{
469		side:          s,
470		cbuf:          cbuf,
471		sendQuota:     defaultWindowSize,
472		oiws:          defaultWindowSize,
473		estdStreams:   make(map[uint32]*outStream),
474		activeStreams: newOutStreamList(),
475		framer:        fr,
476		hBuf:          &buf,
477		hEnc:          hpack.NewEncoder(&buf),
478		bdpEst:        bdpEst,
479	}
480	return l
481}
482
483const minBatchSize = 1000
484
485// run should be run in a separate goroutine.
486// It reads control frames from controlBuf and processes them by:
487// 1. Updating loopy's internal state, or/and
488// 2. Writing out HTTP2 frames on the wire.
489//
490// Loopy keeps all active streams with data to send in a linked-list.
491// All streams in the activeStreams linked-list must have both:
492// 1. Data to send, and
493// 2. Stream level flow control quota available.
494//
495// In each iteration of run loop, other than processing the incoming control
496// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
497// This results in writing of HTTP2 frames into an underlying write buffer.
498// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
499// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
500// if the batch size is too low to give stream goroutines a chance to fill it up.
501func (l *loopyWriter) run() (err error) {
502	defer func() {
503		if err == ErrConnClosing {
504			// Don't log ErrConnClosing as error since it happens
505			// 1. When the connection is closed by some other known issue.
506			// 2. User closed the connection.
507			// 3. A graceful close of connection.
508			infof("transport: loopyWriter.run returning. %v", err)
509			err = nil
510		}
511	}()
512	for {
513		it, err := l.cbuf.get(true)
514		if err != nil {
515			return err
516		}
517		if err = l.handle(it); err != nil {
518			return err
519		}
520		if _, err = l.processData(); err != nil {
521			return err
522		}
523		gosched := true
524	hasdata:
525		for {
526			it, err := l.cbuf.get(false)
527			if err != nil {
528				return err
529			}
530			if it != nil {
531				if err = l.handle(it); err != nil {
532					return err
533				}
534				if _, err = l.processData(); err != nil {
535					return err
536				}
537				continue hasdata
538			}
539			isEmpty, err := l.processData()
540			if err != nil {
541				return err
542			}
543			if !isEmpty {
544				continue hasdata
545			}
546			if gosched {
547				gosched = false
548				if l.framer.writer.offset < minBatchSize {
549					runtime.Gosched()
550					continue hasdata
551				}
552			}
553			l.framer.writer.Flush()
554			break hasdata
555
556		}
557	}
558}
559
560func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
561	return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
562}
563
564func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
565	// Otherwise update the quota.
566	if w.streamID == 0 {
567		l.sendQuota += w.increment
568		return nil
569	}
570	// Find the stream and update it.
571	if str, ok := l.estdStreams[w.streamID]; ok {
572		str.bytesOutStanding -= int(w.increment)
573		if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
574			str.state = active
575			l.activeStreams.enqueue(str)
576			return nil
577		}
578	}
579	return nil
580}
581
582func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
583	return l.framer.fr.WriteSettings(s.ss...)
584}
585
586func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
587	if err := l.applySettings(s.ss); err != nil {
588		return err
589	}
590	return l.framer.fr.WriteSettingsAck()
591}
592
593func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
594	str := &outStream{
595		id:    h.streamID,
596		state: empty,
597		itl:   &itemList{},
598		wq:    h.wq,
599	}
600	l.estdStreams[h.streamID] = str
601	return nil
602}
603
604func (l *loopyWriter) headerHandler(h *headerFrame) error {
605	if l.side == serverSide {
606		str, ok := l.estdStreams[h.streamID]
607		if !ok {
608			warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
609			return nil
610		}
611		// Case 1.A: Server is responding back with headers.
612		if !h.endStream {
613			return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
614		}
615		// else:  Case 1.B: Server wants to close stream.
616
617		if str.state != empty { // either active or waiting on stream quota.
618			// add it str's list of items.
619			str.itl.enqueue(h)
620			return nil
621		}
622		if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
623			return err
624		}
625		return l.cleanupStreamHandler(h.cleanup)
626	}
627	// Case 2: Client wants to originate stream.
628	str := &outStream{
629		id:    h.streamID,
630		state: empty,
631		itl:   &itemList{},
632		wq:    h.wq,
633	}
634	str.itl.enqueue(h)
635	return l.originateStream(str)
636}
637
638func (l *loopyWriter) originateStream(str *outStream) error {
639	hdr := str.itl.dequeue().(*headerFrame)
640	if err := hdr.initStream(str.id); err != nil {
641		if err == ErrConnClosing {
642			return err
643		}
644		// Other errors(errStreamDrain) need not close transport.
645		return nil
646	}
647	if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
648		return err
649	}
650	l.estdStreams[str.id] = str
651	return nil
652}
653
654func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
655	if onWrite != nil {
656		onWrite()
657	}
658	l.hBuf.Reset()
659	for _, f := range hf {
660		if err := l.hEnc.WriteField(f); err != nil {
661			warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err)
662		}
663	}
664	var (
665		err               error
666		endHeaders, first bool
667	)
668	first = true
669	for !endHeaders {
670		size := l.hBuf.Len()
671		if size > http2MaxFrameLen {
672			size = http2MaxFrameLen
673		} else {
674			endHeaders = true
675		}
676		if first {
677			first = false
678			err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
679				StreamID:      streamID,
680				BlockFragment: l.hBuf.Next(size),
681				EndStream:     endStream,
682				EndHeaders:    endHeaders,
683			})
684		} else {
685			err = l.framer.fr.WriteContinuation(
686				streamID,
687				endHeaders,
688				l.hBuf.Next(size),
689			)
690		}
691		if err != nil {
692			return err
693		}
694	}
695	return nil
696}
697
698func (l *loopyWriter) preprocessData(df *dataFrame) error {
699	str, ok := l.estdStreams[df.streamID]
700	if !ok {
701		return nil
702	}
703	// If we got data for a stream it means that
704	// stream was originated and the headers were sent out.
705	str.itl.enqueue(df)
706	if str.state == empty {
707		str.state = active
708		l.activeStreams.enqueue(str)
709	}
710	return nil
711}
712
713func (l *loopyWriter) pingHandler(p *ping) error {
714	if !p.ack {
715		l.bdpEst.timesnap(p.data)
716	}
717	return l.framer.fr.WritePing(p.ack, p.data)
718
719}
720
721func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
722	o.resp <- l.sendQuota
723	return nil
724}
725
726func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
727	c.onWrite()
728	if str, ok := l.estdStreams[c.streamID]; ok {
729		// On the server side it could be a trailers-only response or
730		// a RST_STREAM before stream initialization thus the stream might
731		// not be established yet.
732		delete(l.estdStreams, c.streamID)
733		str.deleteSelf()
734	}
735	if c.rst { // If RST_STREAM needs to be sent.
736		if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
737			return err
738		}
739	}
740	if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
741		return ErrConnClosing
742	}
743	return nil
744}
745
746func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
747	if l.side == clientSide {
748		l.draining = true
749		if len(l.estdStreams) == 0 {
750			return ErrConnClosing
751		}
752	}
753	return nil
754}
755
756func (l *loopyWriter) goAwayHandler(g *goAway) error {
757	// Handling of outgoing GoAway is very specific to side.
758	if l.ssGoAwayHandler != nil {
759		draining, err := l.ssGoAwayHandler(g)
760		if err != nil {
761			return err
762		}
763		l.draining = draining
764	}
765	return nil
766}
767
768func (l *loopyWriter) handle(i interface{}) error {
769	switch i := i.(type) {
770	case *incomingWindowUpdate:
771		return l.incomingWindowUpdateHandler(i)
772	case *outgoingWindowUpdate:
773		return l.outgoingWindowUpdateHandler(i)
774	case *incomingSettings:
775		return l.incomingSettingsHandler(i)
776	case *outgoingSettings:
777		return l.outgoingSettingsHandler(i)
778	case *headerFrame:
779		return l.headerHandler(i)
780	case *registerStream:
781		return l.registerStreamHandler(i)
782	case *cleanupStream:
783		return l.cleanupStreamHandler(i)
784	case *incomingGoAway:
785		return l.incomingGoAwayHandler(i)
786	case *dataFrame:
787		return l.preprocessData(i)
788	case *ping:
789		return l.pingHandler(i)
790	case *goAway:
791		return l.goAwayHandler(i)
792	case *outFlowControlSizeRequest:
793		return l.outFlowControlSizeRequestHandler(i)
794	default:
795		return fmt.Errorf("transport: unknown control message type %T", i)
796	}
797}
798
799func (l *loopyWriter) applySettings(ss []http2.Setting) error {
800	for _, s := range ss {
801		switch s.ID {
802		case http2.SettingInitialWindowSize:
803			o := l.oiws
804			l.oiws = s.Val
805			if o < l.oiws {
806				// If the new limit is greater make all depleted streams active.
807				for _, stream := range l.estdStreams {
808					if stream.state == waitingOnStreamQuota {
809						stream.state = active
810						l.activeStreams.enqueue(stream)
811					}
812				}
813			}
814		case http2.SettingHeaderTableSize:
815			updateHeaderTblSize(l.hEnc, s.Val)
816		}
817	}
818	return nil
819}
820
821// processData removes the first stream from active streams, writes out at most 16KB
822// of its data and then puts it at the end of activeStreams if there's still more data
823// to be sent and stream has some stream-level flow control.
824func (l *loopyWriter) processData() (bool, error) {
825	if l.sendQuota == 0 {
826		return true, nil
827	}
828	str := l.activeStreams.dequeue() // Remove the first stream.
829	if str == nil {
830		return true, nil
831	}
832	dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
833	// A data item is represented by a dataFrame, since it later translates into
834	// multiple HTTP2 data frames.
835	// Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
836	// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
837	// maximum possilbe HTTP2 frame size.
838
839	if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
840		// Client sends out empty data frame with endStream = true
841		if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
842			return false, err
843		}
844		str.itl.dequeue() // remove the empty data item from stream
845		if str.itl.isEmpty() {
846			str.state = empty
847		} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
848			if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
849				return false, err
850			}
851			if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
852				return false, nil
853			}
854		} else {
855			l.activeStreams.enqueue(str)
856		}
857		return false, nil
858	}
859	var (
860		idx int
861		buf []byte
862	)
863	if len(dataItem.h) != 0 { // data header has not been written out yet.
864		buf = dataItem.h
865	} else {
866		idx = 1
867		buf = dataItem.d
868	}
869	size := http2MaxFrameLen
870	if len(buf) < size {
871		size = len(buf)
872	}
873	if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
874		str.state = waitingOnStreamQuota
875		return false, nil
876	} else if strQuota < size {
877		size = strQuota
878	}
879
880	if l.sendQuota < uint32(size) { // connection-level flow control.
881		size = int(l.sendQuota)
882	}
883	// Now that outgoing flow controls are checked we can replenish str's write quota
884	str.wq.replenish(size)
885	var endStream bool
886	// If this is the last data message on this stream and all of it can be written in this iteration.
887	if dataItem.endStream && size == len(buf) {
888		// buf contains either data or it contains header but data is empty.
889		if idx == 1 || len(dataItem.d) == 0 {
890			endStream = true
891		}
892	}
893	if dataItem.onEachWrite != nil {
894		dataItem.onEachWrite()
895	}
896	if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
897		return false, err
898	}
899	buf = buf[size:]
900	str.bytesOutStanding += size
901	l.sendQuota -= uint32(size)
902	if idx == 0 {
903		dataItem.h = buf
904	} else {
905		dataItem.d = buf
906	}
907
908	if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
909		str.itl.dequeue()
910	}
911	if str.itl.isEmpty() {
912		str.state = empty
913	} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
914		if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
915			return false, err
916		}
917		if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
918			return false, err
919		}
920	} else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
921		str.state = waitingOnStreamQuota
922	} else { // Otherwise add it back to the list of active streams.
923		l.activeStreams.enqueue(str)
924	}
925	return false, nil
926}
927