1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22	"bytes"
23	"errors"
24	"fmt"
25	"runtime"
26	"strconv"
27	"sync"
28	"sync/atomic"
29
30	"golang.org/x/net/http2"
31	"golang.org/x/net/http2/hpack"
32	"google.golang.org/grpc/internal/grpcutil"
33	"google.golang.org/grpc/status"
34)
35
36var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
37	e.SetMaxDynamicTableSizeLimit(v)
38}
39
40type itemNode struct {
41	it   interface{}
42	next *itemNode
43}
44
45type itemList struct {
46	head *itemNode
47	tail *itemNode
48}
49
50func (il *itemList) enqueue(i interface{}) {
51	n := &itemNode{it: i}
52	if il.tail == nil {
53		il.head, il.tail = n, n
54		return
55	}
56	il.tail.next = n
57	il.tail = n
58}
59
60// peek returns the first item in the list without removing it from the
61// list.
62func (il *itemList) peek() interface{} {
63	return il.head.it
64}
65
66func (il *itemList) dequeue() interface{} {
67	if il.head == nil {
68		return nil
69	}
70	i := il.head.it
71	il.head = il.head.next
72	if il.head == nil {
73		il.tail = nil
74	}
75	return i
76}
77
78func (il *itemList) dequeueAll() *itemNode {
79	h := il.head
80	il.head, il.tail = nil, nil
81	return h
82}
83
84func (il *itemList) isEmpty() bool {
85	return il.head == nil
86}
87
88// The following defines various control items which could flow through
89// the control buffer of transport. They represent different aspects of
90// control tasks, e.g., flow control, settings, streaming resetting, etc.
91
92// maxQueuedTransportResponseFrames is the most queued "transport response"
93// frames we will buffer before preventing new reads from occurring on the
94// transport.  These are control frames sent in response to client requests,
95// such as RST_STREAM due to bad headers or settings acks.
96const maxQueuedTransportResponseFrames = 50
97
98type cbItem interface {
99	isTransportResponseFrame() bool
100}
101
102// registerStream is used to register an incoming stream with loopy writer.
103type registerStream struct {
104	streamID uint32
105	wq       *writeQuota
106}
107
108func (*registerStream) isTransportResponseFrame() bool { return false }
109
110// headerFrame is also used to register stream on the client-side.
111type headerFrame struct {
112	streamID   uint32
113	hf         []hpack.HeaderField
114	endStream  bool               // Valid on server side.
115	initStream func(uint32) error // Used only on the client side.
116	onWrite    func()
117	wq         *writeQuota    // write quota for the stream created.
118	cleanup    *cleanupStream // Valid on the server side.
119	onOrphaned func(error)    // Valid on client-side
120}
121
122func (h *headerFrame) isTransportResponseFrame() bool {
123	return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
124}
125
126type cleanupStream struct {
127	streamID uint32
128	rst      bool
129	rstCode  http2.ErrCode
130	onWrite  func()
131}
132
133func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
134
135type earlyAbortStream struct {
136	streamID       uint32
137	contentSubtype string
138	status         *status.Status
139}
140
141func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
142
143type dataFrame struct {
144	streamID  uint32
145	endStream bool
146	h         []byte
147	d         []byte
148	// onEachWrite is called every time
149	// a part of d is written out.
150	onEachWrite func()
151}
152
153func (*dataFrame) isTransportResponseFrame() bool { return false }
154
155type incomingWindowUpdate struct {
156	streamID  uint32
157	increment uint32
158}
159
160func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
161
162type outgoingWindowUpdate struct {
163	streamID  uint32
164	increment uint32
165}
166
167func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
168	return false // window updates are throttled by thresholds
169}
170
171type incomingSettings struct {
172	ss []http2.Setting
173}
174
175func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
176
177type outgoingSettings struct {
178	ss []http2.Setting
179}
180
181func (*outgoingSettings) isTransportResponseFrame() bool { return false }
182
183type incomingGoAway struct {
184}
185
186func (*incomingGoAway) isTransportResponseFrame() bool { return false }
187
188type goAway struct {
189	code      http2.ErrCode
190	debugData []byte
191	headsUp   bool
192	closeConn bool
193}
194
195func (*goAway) isTransportResponseFrame() bool { return false }
196
197type ping struct {
198	ack  bool
199	data [8]byte
200}
201
202func (*ping) isTransportResponseFrame() bool { return true }
203
204type outFlowControlSizeRequest struct {
205	resp chan uint32
206}
207
208func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
209
210type outStreamState int
211
212const (
213	active outStreamState = iota
214	empty
215	waitingOnStreamQuota
216)
217
218type outStream struct {
219	id               uint32
220	state            outStreamState
221	itl              *itemList
222	bytesOutStanding int
223	wq               *writeQuota
224
225	next *outStream
226	prev *outStream
227}
228
229func (s *outStream) deleteSelf() {
230	if s.prev != nil {
231		s.prev.next = s.next
232	}
233	if s.next != nil {
234		s.next.prev = s.prev
235	}
236	s.next, s.prev = nil, nil
237}
238
239type outStreamList struct {
240	// Following are sentinel objects that mark the
241	// beginning and end of the list. They do not
242	// contain any item lists. All valid objects are
243	// inserted in between them.
244	// This is needed so that an outStream object can
245	// deleteSelf() in O(1) time without knowing which
246	// list it belongs to.
247	head *outStream
248	tail *outStream
249}
250
251func newOutStreamList() *outStreamList {
252	head, tail := new(outStream), new(outStream)
253	head.next = tail
254	tail.prev = head
255	return &outStreamList{
256		head: head,
257		tail: tail,
258	}
259}
260
261func (l *outStreamList) enqueue(s *outStream) {
262	e := l.tail.prev
263	e.next = s
264	s.prev = e
265	s.next = l.tail
266	l.tail.prev = s
267}
268
269// remove from the beginning of the list.
270func (l *outStreamList) dequeue() *outStream {
271	b := l.head.next
272	if b == l.tail {
273		return nil
274	}
275	b.deleteSelf()
276	return b
277}
278
279// controlBuffer is a way to pass information to loopy.
280// Information is passed as specific struct types called control frames.
281// A control frame not only represents data, messages or headers to be sent out
282// but can also be used to instruct loopy to update its internal state.
283// It shouldn't be confused with an HTTP2 frame, although some of the control frames
284// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
285type controlBuffer struct {
286	ch              chan struct{}
287	done            <-chan struct{}
288	mu              sync.Mutex
289	consumerWaiting bool
290	list            *itemList
291	err             error
292
293	// transportResponseFrames counts the number of queued items that represent
294	// the response of an action initiated by the peer.  trfChan is created
295	// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
296	// closed and nilled when transportResponseFrames drops below the
297	// threshold.  Both fields are protected by mu.
298	transportResponseFrames int
299	trfChan                 atomic.Value // chan struct{}
300}
301
302func newControlBuffer(done <-chan struct{}) *controlBuffer {
303	return &controlBuffer{
304		ch:   make(chan struct{}, 1),
305		list: &itemList{},
306		done: done,
307	}
308}
309
310// throttle blocks if there are too many incomingSettings/cleanupStreams in the
311// controlbuf.
312func (c *controlBuffer) throttle() {
313	ch, _ := c.trfChan.Load().(chan struct{})
314	if ch != nil {
315		select {
316		case <-ch:
317		case <-c.done:
318		}
319	}
320}
321
322func (c *controlBuffer) put(it cbItem) error {
323	_, err := c.executeAndPut(nil, it)
324	return err
325}
326
327func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
328	var wakeUp bool
329	c.mu.Lock()
330	if c.err != nil {
331		c.mu.Unlock()
332		return false, c.err
333	}
334	if f != nil {
335		if !f(it) { // f wasn't successful
336			c.mu.Unlock()
337			return false, nil
338		}
339	}
340	if c.consumerWaiting {
341		wakeUp = true
342		c.consumerWaiting = false
343	}
344	c.list.enqueue(it)
345	if it.isTransportResponseFrame() {
346		c.transportResponseFrames++
347		if c.transportResponseFrames == maxQueuedTransportResponseFrames {
348			// We are adding the frame that puts us over the threshold; create
349			// a throttling channel.
350			c.trfChan.Store(make(chan struct{}))
351		}
352	}
353	c.mu.Unlock()
354	if wakeUp {
355		select {
356		case c.ch <- struct{}{}:
357		default:
358		}
359	}
360	return true, nil
361}
362
363// Note argument f should never be nil.
364func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
365	c.mu.Lock()
366	if c.err != nil {
367		c.mu.Unlock()
368		return false, c.err
369	}
370	if !f(it) { // f wasn't successful
371		c.mu.Unlock()
372		return false, nil
373	}
374	c.mu.Unlock()
375	return true, nil
376}
377
378func (c *controlBuffer) get(block bool) (interface{}, error) {
379	for {
380		c.mu.Lock()
381		if c.err != nil {
382			c.mu.Unlock()
383			return nil, c.err
384		}
385		if !c.list.isEmpty() {
386			h := c.list.dequeue().(cbItem)
387			if h.isTransportResponseFrame() {
388				if c.transportResponseFrames == maxQueuedTransportResponseFrames {
389					// We are removing the frame that put us over the
390					// threshold; close and clear the throttling channel.
391					ch := c.trfChan.Load().(chan struct{})
392					close(ch)
393					c.trfChan.Store((chan struct{})(nil))
394				}
395				c.transportResponseFrames--
396			}
397			c.mu.Unlock()
398			return h, nil
399		}
400		if !block {
401			c.mu.Unlock()
402			return nil, nil
403		}
404		c.consumerWaiting = true
405		c.mu.Unlock()
406		select {
407		case <-c.ch:
408		case <-c.done:
409			return nil, ErrConnClosing
410		}
411	}
412}
413
414func (c *controlBuffer) finish() {
415	c.mu.Lock()
416	if c.err != nil {
417		c.mu.Unlock()
418		return
419	}
420	c.err = ErrConnClosing
421	// There may be headers for streams in the control buffer.
422	// These streams need to be cleaned out since the transport
423	// is still not aware of these yet.
424	for head := c.list.dequeueAll(); head != nil; head = head.next {
425		hdr, ok := head.it.(*headerFrame)
426		if !ok {
427			continue
428		}
429		if hdr.onOrphaned != nil { // It will be nil on the server-side.
430			hdr.onOrphaned(ErrConnClosing)
431		}
432	}
433	// In case throttle() is currently in flight, it needs to be unblocked.
434	// Otherwise, the transport may not close, since the transport is closed by
435	// the reader encountering the connection error.
436	ch, _ := c.trfChan.Load().(chan struct{})
437	if ch != nil {
438		close(ch)
439	}
440	c.trfChan.Store((chan struct{})(nil))
441	c.mu.Unlock()
442}
443
444type side int
445
446const (
447	clientSide side = iota
448	serverSide
449)
450
451// Loopy receives frames from the control buffer.
452// Each frame is handled individually; most of the work done by loopy goes
453// into handling data frames. Loopy maintains a queue of active streams, and each
454// stream maintains a queue of data frames; as loopy receives data frames
455// it gets added to the queue of the relevant stream.
456// Loopy goes over this list of active streams by processing one node every iteration,
457// thereby closely resemebling to a round-robin scheduling over all streams. While
458// processing a stream, loopy writes out data bytes from this stream capped by the min
459// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
460type loopyWriter struct {
461	side      side
462	cbuf      *controlBuffer
463	sendQuota uint32
464	oiws      uint32 // outbound initial window size.
465	// estdStreams is map of all established streams that are not cleaned-up yet.
466	// On client-side, this is all streams whose headers were sent out.
467	// On server-side, this is all streams whose headers were received.
468	estdStreams map[uint32]*outStream // Established streams.
469	// activeStreams is a linked-list of all streams that have data to send and some
470	// stream-level flow control quota.
471	// Each of these streams internally have a list of data items(and perhaps trailers
472	// on the server-side) to be sent out.
473	activeStreams *outStreamList
474	framer        *framer
475	hBuf          *bytes.Buffer  // The buffer for HPACK encoding.
476	hEnc          *hpack.Encoder // HPACK encoder.
477	bdpEst        *bdpEstimator
478	draining      bool
479
480	// Side-specific handlers
481	ssGoAwayHandler func(*goAway) (bool, error)
482}
483
484func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
485	var buf bytes.Buffer
486	l := &loopyWriter{
487		side:          s,
488		cbuf:          cbuf,
489		sendQuota:     defaultWindowSize,
490		oiws:          defaultWindowSize,
491		estdStreams:   make(map[uint32]*outStream),
492		activeStreams: newOutStreamList(),
493		framer:        fr,
494		hBuf:          &buf,
495		hEnc:          hpack.NewEncoder(&buf),
496		bdpEst:        bdpEst,
497	}
498	return l
499}
500
501const minBatchSize = 1000
502
503// run should be run in a separate goroutine.
504// It reads control frames from controlBuf and processes them by:
505// 1. Updating loopy's internal state, or/and
506// 2. Writing out HTTP2 frames on the wire.
507//
508// Loopy keeps all active streams with data to send in a linked-list.
509// All streams in the activeStreams linked-list must have both:
510// 1. Data to send, and
511// 2. Stream level flow control quota available.
512//
513// In each iteration of run loop, other than processing the incoming control
514// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
515// This results in writing of HTTP2 frames into an underlying write buffer.
516// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
517// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
518// if the batch size is too low to give stream goroutines a chance to fill it up.
519func (l *loopyWriter) run() (err error) {
520	defer func() {
521		if err == ErrConnClosing {
522			// Don't log ErrConnClosing as error since it happens
523			// 1. When the connection is closed by some other known issue.
524			// 2. User closed the connection.
525			// 3. A graceful close of connection.
526			if logger.V(logLevel) {
527				logger.Infof("transport: loopyWriter.run returning. %v", err)
528			}
529			err = nil
530		}
531	}()
532	for {
533		it, err := l.cbuf.get(true)
534		if err != nil {
535			return err
536		}
537		if err = l.handle(it); err != nil {
538			return err
539		}
540		if _, err = l.processData(); err != nil {
541			return err
542		}
543		gosched := true
544	hasdata:
545		for {
546			it, err := l.cbuf.get(false)
547			if err != nil {
548				return err
549			}
550			if it != nil {
551				if err = l.handle(it); err != nil {
552					return err
553				}
554				if _, err = l.processData(); err != nil {
555					return err
556				}
557				continue hasdata
558			}
559			isEmpty, err := l.processData()
560			if err != nil {
561				return err
562			}
563			if !isEmpty {
564				continue hasdata
565			}
566			if gosched {
567				gosched = false
568				if l.framer.writer.offset < minBatchSize {
569					runtime.Gosched()
570					continue hasdata
571				}
572			}
573			l.framer.writer.Flush()
574			break hasdata
575
576		}
577	}
578}
579
580func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
581	return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
582}
583
584func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
585	// Otherwise update the quota.
586	if w.streamID == 0 {
587		l.sendQuota += w.increment
588		return nil
589	}
590	// Find the stream and update it.
591	if str, ok := l.estdStreams[w.streamID]; ok {
592		str.bytesOutStanding -= int(w.increment)
593		if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
594			str.state = active
595			l.activeStreams.enqueue(str)
596			return nil
597		}
598	}
599	return nil
600}
601
602func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
603	return l.framer.fr.WriteSettings(s.ss...)
604}
605
606func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
607	if err := l.applySettings(s.ss); err != nil {
608		return err
609	}
610	return l.framer.fr.WriteSettingsAck()
611}
612
613func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
614	str := &outStream{
615		id:    h.streamID,
616		state: empty,
617		itl:   &itemList{},
618		wq:    h.wq,
619	}
620	l.estdStreams[h.streamID] = str
621	return nil
622}
623
624func (l *loopyWriter) headerHandler(h *headerFrame) error {
625	if l.side == serverSide {
626		str, ok := l.estdStreams[h.streamID]
627		if !ok {
628			if logger.V(logLevel) {
629				logger.Warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
630			}
631			return nil
632		}
633		// Case 1.A: Server is responding back with headers.
634		if !h.endStream {
635			return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
636		}
637		// else:  Case 1.B: Server wants to close stream.
638
639		if str.state != empty { // either active or waiting on stream quota.
640			// add it str's list of items.
641			str.itl.enqueue(h)
642			return nil
643		}
644		if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
645			return err
646		}
647		return l.cleanupStreamHandler(h.cleanup)
648	}
649	// Case 2: Client wants to originate stream.
650	str := &outStream{
651		id:    h.streamID,
652		state: empty,
653		itl:   &itemList{},
654		wq:    h.wq,
655	}
656	str.itl.enqueue(h)
657	return l.originateStream(str)
658}
659
660func (l *loopyWriter) originateStream(str *outStream) error {
661	hdr := str.itl.dequeue().(*headerFrame)
662	if err := hdr.initStream(str.id); err != nil {
663		if err == ErrConnClosing {
664			return err
665		}
666		// Other errors(errStreamDrain) need not close transport.
667		return nil
668	}
669	if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
670		return err
671	}
672	l.estdStreams[str.id] = str
673	return nil
674}
675
676func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
677	if onWrite != nil {
678		onWrite()
679	}
680	l.hBuf.Reset()
681	for _, f := range hf {
682		if err := l.hEnc.WriteField(f); err != nil {
683			if logger.V(logLevel) {
684				logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", err)
685			}
686		}
687	}
688	var (
689		err               error
690		endHeaders, first bool
691	)
692	first = true
693	for !endHeaders {
694		size := l.hBuf.Len()
695		if size > http2MaxFrameLen {
696			size = http2MaxFrameLen
697		} else {
698			endHeaders = true
699		}
700		if first {
701			first = false
702			err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
703				StreamID:      streamID,
704				BlockFragment: l.hBuf.Next(size),
705				EndStream:     endStream,
706				EndHeaders:    endHeaders,
707			})
708		} else {
709			err = l.framer.fr.WriteContinuation(
710				streamID,
711				endHeaders,
712				l.hBuf.Next(size),
713			)
714		}
715		if err != nil {
716			return err
717		}
718	}
719	return nil
720}
721
722func (l *loopyWriter) preprocessData(df *dataFrame) error {
723	str, ok := l.estdStreams[df.streamID]
724	if !ok {
725		return nil
726	}
727	// If we got data for a stream it means that
728	// stream was originated and the headers were sent out.
729	str.itl.enqueue(df)
730	if str.state == empty {
731		str.state = active
732		l.activeStreams.enqueue(str)
733	}
734	return nil
735}
736
737func (l *loopyWriter) pingHandler(p *ping) error {
738	if !p.ack {
739		l.bdpEst.timesnap(p.data)
740	}
741	return l.framer.fr.WritePing(p.ack, p.data)
742
743}
744
745func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
746	o.resp <- l.sendQuota
747	return nil
748}
749
750func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
751	c.onWrite()
752	if str, ok := l.estdStreams[c.streamID]; ok {
753		// On the server side it could be a trailers-only response or
754		// a RST_STREAM before stream initialization thus the stream might
755		// not be established yet.
756		delete(l.estdStreams, c.streamID)
757		str.deleteSelf()
758	}
759	if c.rst { // If RST_STREAM needs to be sent.
760		if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
761			return err
762		}
763	}
764	if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
765		return ErrConnClosing
766	}
767	return nil
768}
769
770func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
771	if l.side == clientSide {
772		return errors.New("earlyAbortStream not handled on client")
773	}
774
775	headerFields := []hpack.HeaderField{
776		{Name: ":status", Value: "200"},
777		{Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
778		{Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
779		{Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
780	}
781
782	if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
783		return err
784	}
785	return nil
786}
787
788func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
789	if l.side == clientSide {
790		l.draining = true
791		if len(l.estdStreams) == 0 {
792			return ErrConnClosing
793		}
794	}
795	return nil
796}
797
798func (l *loopyWriter) goAwayHandler(g *goAway) error {
799	// Handling of outgoing GoAway is very specific to side.
800	if l.ssGoAwayHandler != nil {
801		draining, err := l.ssGoAwayHandler(g)
802		if err != nil {
803			return err
804		}
805		l.draining = draining
806	}
807	return nil
808}
809
810func (l *loopyWriter) handle(i interface{}) error {
811	switch i := i.(type) {
812	case *incomingWindowUpdate:
813		return l.incomingWindowUpdateHandler(i)
814	case *outgoingWindowUpdate:
815		return l.outgoingWindowUpdateHandler(i)
816	case *incomingSettings:
817		return l.incomingSettingsHandler(i)
818	case *outgoingSettings:
819		return l.outgoingSettingsHandler(i)
820	case *headerFrame:
821		return l.headerHandler(i)
822	case *registerStream:
823		return l.registerStreamHandler(i)
824	case *cleanupStream:
825		return l.cleanupStreamHandler(i)
826	case *earlyAbortStream:
827		return l.earlyAbortStreamHandler(i)
828	case *incomingGoAway:
829		return l.incomingGoAwayHandler(i)
830	case *dataFrame:
831		return l.preprocessData(i)
832	case *ping:
833		return l.pingHandler(i)
834	case *goAway:
835		return l.goAwayHandler(i)
836	case *outFlowControlSizeRequest:
837		return l.outFlowControlSizeRequestHandler(i)
838	default:
839		return fmt.Errorf("transport: unknown control message type %T", i)
840	}
841}
842
843func (l *loopyWriter) applySettings(ss []http2.Setting) error {
844	for _, s := range ss {
845		switch s.ID {
846		case http2.SettingInitialWindowSize:
847			o := l.oiws
848			l.oiws = s.Val
849			if o < l.oiws {
850				// If the new limit is greater make all depleted streams active.
851				for _, stream := range l.estdStreams {
852					if stream.state == waitingOnStreamQuota {
853						stream.state = active
854						l.activeStreams.enqueue(stream)
855					}
856				}
857			}
858		case http2.SettingHeaderTableSize:
859			updateHeaderTblSize(l.hEnc, s.Val)
860		}
861	}
862	return nil
863}
864
865// processData removes the first stream from active streams, writes out at most 16KB
866// of its data and then puts it at the end of activeStreams if there's still more data
867// to be sent and stream has some stream-level flow control.
868func (l *loopyWriter) processData() (bool, error) {
869	if l.sendQuota == 0 {
870		return true, nil
871	}
872	str := l.activeStreams.dequeue() // Remove the first stream.
873	if str == nil {
874		return true, nil
875	}
876	dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
877	// A data item is represented by a dataFrame, since it later translates into
878	// multiple HTTP2 data frames.
879	// Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
880	// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
881	// maximum possilbe HTTP2 frame size.
882
883	if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
884		// Client sends out empty data frame with endStream = true
885		if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
886			return false, err
887		}
888		str.itl.dequeue() // remove the empty data item from stream
889		if str.itl.isEmpty() {
890			str.state = empty
891		} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
892			if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
893				return false, err
894			}
895			if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
896				return false, nil
897			}
898		} else {
899			l.activeStreams.enqueue(str)
900		}
901		return false, nil
902	}
903	var (
904		buf []byte
905	)
906	// Figure out the maximum size we can send
907	maxSize := http2MaxFrameLen
908	if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
909		str.state = waitingOnStreamQuota
910		return false, nil
911	} else if maxSize > strQuota {
912		maxSize = strQuota
913	}
914	if maxSize > int(l.sendQuota) { // connection-level flow control.
915		maxSize = int(l.sendQuota)
916	}
917	// Compute how much of the header and data we can send within quota and max frame length
918	hSize := min(maxSize, len(dataItem.h))
919	dSize := min(maxSize-hSize, len(dataItem.d))
920	if hSize != 0 {
921		if dSize == 0 {
922			buf = dataItem.h
923		} else {
924			// We can add some data to grpc message header to distribute bytes more equally across frames.
925			// Copy on the stack to avoid generating garbage
926			var localBuf [http2MaxFrameLen]byte
927			copy(localBuf[:hSize], dataItem.h)
928			copy(localBuf[hSize:], dataItem.d[:dSize])
929			buf = localBuf[:hSize+dSize]
930		}
931	} else {
932		buf = dataItem.d
933	}
934
935	size := hSize + dSize
936
937	// Now that outgoing flow controls are checked we can replenish str's write quota
938	str.wq.replenish(size)
939	var endStream bool
940	// If this is the last data message on this stream and all of it can be written in this iteration.
941	if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
942		endStream = true
943	}
944	if dataItem.onEachWrite != nil {
945		dataItem.onEachWrite()
946	}
947	if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
948		return false, err
949	}
950	str.bytesOutStanding += size
951	l.sendQuota -= uint32(size)
952	dataItem.h = dataItem.h[hSize:]
953	dataItem.d = dataItem.d[dSize:]
954
955	if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
956		str.itl.dequeue()
957	}
958	if str.itl.isEmpty() {
959		str.state = empty
960	} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
961		if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
962			return false, err
963		}
964		if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
965			return false, err
966		}
967	} else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
968		str.state = waitingOnStreamQuota
969	} else { // Otherwise add it back to the list of active streams.
970		l.activeStreams.enqueue(str)
971	}
972	return false, nil
973}
974
975func min(a, b int) int {
976	if a < b {
977		return a
978	}
979	return b
980}
981