1// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import "fmt"
8
9// WriteScheduler is the interface implemented by HTTP/2 write schedulers.
10// Methods are never called concurrently.
11type WriteScheduler interface {
12	// OpenStream opens a new stream in the write scheduler.
13	// It is illegal to call this with streamID=0 or with a streamID that is
14	// already open -- the call may panic.
15	OpenStream(streamID uint32, options OpenStreamOptions)
16
17	// CloseStream closes a stream in the write scheduler. Any frames queued on
18	// this stream should be discarded. It is illegal to call this on a stream
19	// that is not open -- the call may panic.
20	CloseStream(streamID uint32)
21
22	// AdjustStream adjusts the priority of the given stream. This may be called
23	// on a stream that has not yet been opened or has been closed. Note that
24	// RFC 7540 allows PRIORITY frames to be sent on streams in any state. See:
25	// https://tools.ietf.org/html/rfc7540#section-5.1
26	AdjustStream(streamID uint32, priority PriorityParam)
27
28	// Push queues a frame in the scheduler. In most cases, this will not be
29	// called with wr.StreamID()!=0 unless that stream is currently open. The one
30	// exception is RST_STREAM frames, which may be sent on idle or closed streams.
31	Push(wr FrameWriteRequest)
32
33	// Pop dequeues the next frame to write. Returns false if no frames can
34	// be written. Frames with a given wr.StreamID() are Pop'd in the same
35	// order they are Push'd. No frames should be discarded except by CloseStream.
36	Pop() (wr FrameWriteRequest, ok bool)
37}
38
39// OpenStreamOptions specifies extra options for WriteScheduler.OpenStream.
40type OpenStreamOptions struct {
41	// PusherID is zero if the stream was initiated by the client. Otherwise,
42	// PusherID names the stream that pushed the newly opened stream.
43	PusherID uint32
44}
45
46// FrameWriteRequest is a request to write a frame.
47type FrameWriteRequest struct {
48	// write is the interface value that does the writing, once the
49	// WriteScheduler has selected this frame to write. The write
50	// functions are all defined in write.go.
51	write writeFramer
52
53	// stream is the stream on which this frame will be written.
54	// nil for non-stream frames like PING and SETTINGS.
55	stream *stream
56
57	// done, if non-nil, must be a buffered channel with space for
58	// 1 message and is sent the return value from write (or an
59	// earlier error) when the frame has been written.
60	done chan error
61}
62
63// StreamID returns the id of the stream this frame will be written to.
64// 0 is used for non-stream frames such as PING and SETTINGS.
65func (wr FrameWriteRequest) StreamID() uint32 {
66	if wr.stream == nil {
67		if se, ok := wr.write.(StreamError); ok {
68			// (*serverConn).resetStream doesn't set
69			// stream because it doesn't necessarily have
70			// one. So special case this type of write
71			// message.
72			return se.StreamID
73		}
74		return 0
75	}
76	return wr.stream.id
77}
78
79// isControl reports whether wr is a control frame for MaxQueuedControlFrames
80// purposes. That includes non-stream frames and RST_STREAM frames.
81func (wr FrameWriteRequest) isControl() bool {
82	return wr.stream == nil
83}
84
85// DataSize returns the number of flow control bytes that must be consumed
86// to write this entire frame. This is 0 for non-DATA frames.
87func (wr FrameWriteRequest) DataSize() int {
88	if wd, ok := wr.write.(*writeData); ok {
89		return len(wd.p)
90	}
91	return 0
92}
93
94// Consume consumes min(n, available) bytes from this frame, where available
95// is the number of flow control bytes available on the stream. Consume returns
96// 0, 1, or 2 frames, where the integer return value gives the number of frames
97// returned.
98//
99// If flow control prevents consuming any bytes, this returns (_, _, 0). If
100// the entire frame was consumed, this returns (wr, _, 1). Otherwise, this
101// returns (consumed, rest, 2), where 'consumed' contains the consumed bytes and
102// 'rest' contains the remaining bytes. The consumed bytes are deducted from the
103// underlying stream's flow control budget.
104func (wr FrameWriteRequest) Consume(n int32) (FrameWriteRequest, FrameWriteRequest, int) {
105	var empty FrameWriteRequest
106
107	// Non-DATA frames are always consumed whole.
108	wd, ok := wr.write.(*writeData)
109	if !ok || len(wd.p) == 0 {
110		return wr, empty, 1
111	}
112
113	// Might need to split after applying limits.
114	allowed := wr.stream.flow.available()
115	if n < allowed {
116		allowed = n
117	}
118	if wr.stream.sc.maxFrameSize < allowed {
119		allowed = wr.stream.sc.maxFrameSize
120	}
121	if allowed <= 0 {
122		return empty, empty, 0
123	}
124	if len(wd.p) > int(allowed) {
125		wr.stream.flow.take(allowed)
126		consumed := FrameWriteRequest{
127			stream: wr.stream,
128			write: &writeData{
129				streamID: wd.streamID,
130				p:        wd.p[:allowed],
131				// Even if the original had endStream set, there
132				// are bytes remaining because len(wd.p) > allowed,
133				// so we know endStream is false.
134				endStream: false,
135			},
136			// Our caller is blocking on the final DATA frame, not
137			// this intermediate frame, so no need to wait.
138			done: nil,
139		}
140		rest := FrameWriteRequest{
141			stream: wr.stream,
142			write: &writeData{
143				streamID:  wd.streamID,
144				p:         wd.p[allowed:],
145				endStream: wd.endStream,
146			},
147			done: wr.done,
148		}
149		return consumed, rest, 2
150	}
151
152	// The frame is consumed whole.
153	// NB: This cast cannot overflow because allowed is <= math.MaxInt32.
154	wr.stream.flow.take(int32(len(wd.p)))
155	return wr, empty, 1
156}
157
158// String is for debugging only.
159func (wr FrameWriteRequest) String() string {
160	var des string
161	if s, ok := wr.write.(fmt.Stringer); ok {
162		des = s.String()
163	} else {
164		des = fmt.Sprintf("%T", wr.write)
165	}
166	return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des)
167}
168
169// replyToWriter sends err to wr.done and panics if the send must block
170// This does nothing if wr.done is nil.
171func (wr *FrameWriteRequest) replyToWriter(err error) {
172	if wr.done == nil {
173		return
174	}
175	select {
176	case wr.done <- err:
177	default:
178		panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write))
179	}
180	wr.write = nil // prevent use (assume it's tainted after wr.done send)
181}
182
183// writeQueue is used by implementations of WriteScheduler.
184type writeQueue struct {
185	s []FrameWriteRequest
186}
187
188func (q *writeQueue) empty() bool { return len(q.s) == 0 }
189
190func (q *writeQueue) push(wr FrameWriteRequest) {
191	q.s = append(q.s, wr)
192}
193
194func (q *writeQueue) shift() FrameWriteRequest {
195	if len(q.s) == 0 {
196		panic("invalid use of queue")
197	}
198	wr := q.s[0]
199	// TODO: less copy-happy queue.
200	copy(q.s, q.s[1:])
201	q.s[len(q.s)-1] = FrameWriteRequest{}
202	q.s = q.s[:len(q.s)-1]
203	return wr
204}
205
206// consume consumes up to n bytes from q.s[0]. If the frame is
207// entirely consumed, it is removed from the queue. If the frame
208// is partially consumed, the frame is kept with the consumed
209// bytes removed. Returns true iff any bytes were consumed.
210func (q *writeQueue) consume(n int32) (FrameWriteRequest, bool) {
211	if len(q.s) == 0 {
212		return FrameWriteRequest{}, false
213	}
214	consumed, rest, numresult := q.s[0].Consume(n)
215	switch numresult {
216	case 0:
217		return FrameWriteRequest{}, false
218	case 1:
219		q.shift()
220	case 2:
221		q.s[0] = rest
222	}
223	return consumed, true
224}
225
226type writeQueuePool []*writeQueue
227
228// put inserts an unused writeQueue into the pool.
229func (p *writeQueuePool) put(q *writeQueue) {
230	for i := range q.s {
231		q.s[i] = FrameWriteRequest{}
232	}
233	q.s = q.s[:0]
234	*p = append(*p, q)
235}
236
237// get returns an empty writeQueue.
238func (p *writeQueuePool) get() *writeQueue {
239	ln := len(*p)
240	if ln == 0 {
241		return new(writeQueue)
242	}
243	x := ln - 1
244	q := (*p)[x]
245	(*p)[x] = nil
246	*p = (*p)[:x]
247	return q
248}
249