1// Copyright 2016 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import (
8	"fmt"
9	"math"
10	"sort"
11)
12
13// RFC 7540, Section 5.3.5: the default weight is 16.
14const priorityDefaultWeight = 15 // 16 = 15 + 1
15
16// PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
17type PriorityWriteSchedulerConfig struct {
18	// MaxClosedNodesInTree controls the maximum number of closed streams to
19	// retain in the priority tree. Setting this to zero saves a small amount
20	// of memory at the cost of performance.
21	//
22	// See RFC 7540, Section 5.3.4:
23	//   "It is possible for a stream to become closed while prioritization
24	//   information ... is in transit. ... This potentially creates suboptimal
25	//   prioritization, since the stream could be given a priority that is
26	//   different from what is intended. To avoid these problems, an endpoint
27	//   SHOULD retain stream prioritization state for a period after streams
28	//   become closed. The longer state is retained, the lower the chance that
29	//   streams are assigned incorrect or default priority values."
30	MaxClosedNodesInTree int
31
32	// MaxIdleNodesInTree controls the maximum number of idle streams to
33	// retain in the priority tree. Setting this to zero saves a small amount
34	// of memory at the cost of performance.
35	//
36	// See RFC 7540, Section 5.3.4:
37	//   Similarly, streams that are in the "idle" state can be assigned
38	//   priority or become a parent of other streams. This allows for the
39	//   creation of a grouping node in the dependency tree, which enables
40	//   more flexible expressions of priority. Idle streams begin with a
41	//   default priority (Section 5.3.5).
42	MaxIdleNodesInTree int
43
44	// ThrottleOutOfOrderWrites enables write throttling to help ensure that
45	// data is delivered in priority order. This works around a race where
46	// stream B depends on stream A and both streams are about to call Write
47	// to queue DATA frames. If B wins the race, a naive scheduler would eagerly
48	// write as much data from B as possible, but this is suboptimal because A
49	// is a higher-priority stream. With throttling enabled, we write a small
50	// amount of data from B to minimize the amount of bandwidth that B can
51	// steal from A.
52	ThrottleOutOfOrderWrites bool
53}
54
55// NewPriorityWriteScheduler constructs a WriteScheduler that schedules
56// frames by following HTTP/2 priorities as described in RFC 7540 Section 5.3.
57// If cfg is nil, default options are used.
58func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
59	if cfg == nil {
60		// For justification of these defaults, see:
61		// https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY
62		cfg = &PriorityWriteSchedulerConfig{
63			MaxClosedNodesInTree:     10,
64			MaxIdleNodesInTree:       10,
65			ThrottleOutOfOrderWrites: false,
66		}
67	}
68
69	ws := &priorityWriteScheduler{
70		nodes:                make(map[uint32]*priorityNode),
71		maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
72		maxIdleNodesInTree:   cfg.MaxIdleNodesInTree,
73		enableWriteThrottle:  cfg.ThrottleOutOfOrderWrites,
74	}
75	ws.nodes[0] = &ws.root
76	if cfg.ThrottleOutOfOrderWrites {
77		ws.writeThrottleLimit = 1024
78	} else {
79		ws.writeThrottleLimit = math.MaxInt32
80	}
81	return ws
82}
83
84type priorityNodeState int
85
86const (
87	priorityNodeOpen priorityNodeState = iota
88	priorityNodeClosed
89	priorityNodeIdle
90)
91
92// priorityNode is a node in an HTTP/2 priority tree.
93// Each node is associated with a single stream ID.
94// See RFC 7540, Section 5.3.
95type priorityNode struct {
96	q            writeQueue        // queue of pending frames to write
97	id           uint32            // id of the stream, or 0 for the root of the tree
98	weight       uint8             // the actual weight is weight+1, so the value is in [1,256]
99	state        priorityNodeState // open | closed | idle
100	bytes        int64             // number of bytes written by this node, or 0 if closed
101	subtreeBytes int64             // sum(node.bytes) of all nodes in this subtree
102
103	// These links form the priority tree.
104	parent     *priorityNode
105	kids       *priorityNode // start of the kids list
106	prev, next *priorityNode // doubly-linked list of siblings
107}
108
109func (n *priorityNode) setParent(parent *priorityNode) {
110	if n == parent {
111		panic("setParent to self")
112	}
113	if n.parent == parent {
114		return
115	}
116	// Unlink from current parent.
117	if parent := n.parent; parent != nil {
118		if n.prev == nil {
119			parent.kids = n.next
120		} else {
121			n.prev.next = n.next
122		}
123		if n.next != nil {
124			n.next.prev = n.prev
125		}
126	}
127	// Link to new parent.
128	// If parent=nil, remove n from the tree.
129	// Always insert at the head of parent.kids (this is assumed by walkReadyInOrder).
130	n.parent = parent
131	if parent == nil {
132		n.next = nil
133		n.prev = nil
134	} else {
135		n.next = parent.kids
136		n.prev = nil
137		if n.next != nil {
138			n.next.prev = n
139		}
140		parent.kids = n
141	}
142}
143
144func (n *priorityNode) addBytes(b int64) {
145	n.bytes += b
146	for ; n != nil; n = n.parent {
147		n.subtreeBytes += b
148	}
149}
150
151// walkReadyInOrder iterates over the tree in priority order, calling f for each node
152// with a non-empty write queue. When f returns true, this function returns true and the
153// walk halts. tmp is used as scratch space for sorting.
154//
155// f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true
156// if any ancestor p of n is still open (ignoring the root node).
157func (n *priorityNode) walkReadyInOrder(openParent bool, tmp *[]*priorityNode, f func(*priorityNode, bool) bool) bool {
158	if !n.q.empty() && f(n, openParent) {
159		return true
160	}
161	if n.kids == nil {
162		return false
163	}
164
165	// Don't consider the root "open" when updating openParent since
166	// we can't send data frames on the root stream (only control frames).
167	if n.id != 0 {
168		openParent = openParent || (n.state == priorityNodeOpen)
169	}
170
171	// Common case: only one kid or all kids have the same weight.
172	// Some clients don't use weights; other clients (like web browsers)
173	// use mostly-linear priority trees.
174	w := n.kids.weight
175	needSort := false
176	for k := n.kids.next; k != nil; k = k.next {
177		if k.weight != w {
178			needSort = true
179			break
180		}
181	}
182	if !needSort {
183		for k := n.kids; k != nil; k = k.next {
184			if k.walkReadyInOrder(openParent, tmp, f) {
185				return true
186			}
187		}
188		return false
189	}
190
191	// Uncommon case: sort the child nodes. We remove the kids from the parent,
192	// then re-insert after sorting so we can reuse tmp for future sort calls.
193	*tmp = (*tmp)[:0]
194	for n.kids != nil {
195		*tmp = append(*tmp, n.kids)
196		n.kids.setParent(nil)
197	}
198	sort.Sort(sortPriorityNodeSiblings(*tmp))
199	for i := len(*tmp) - 1; i >= 0; i-- {
200		(*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
201	}
202	for k := n.kids; k != nil; k = k.next {
203		if k.walkReadyInOrder(openParent, tmp, f) {
204			return true
205		}
206	}
207	return false
208}
209
210type sortPriorityNodeSiblings []*priorityNode
211
212func (z sortPriorityNodeSiblings) Len() int      { return len(z) }
213func (z sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
214func (z sortPriorityNodeSiblings) Less(i, k int) bool {
215	// Prefer the subtree that has sent fewer bytes relative to its weight.
216	// See sections 5.3.2 and 5.3.4.
217	wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes)
218	wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes)
219	if bi == 0 && bk == 0 {
220		return wi >= wk
221	}
222	if bk == 0 {
223		return false
224	}
225	return bi/bk <= wi/wk
226}
227
228type priorityWriteScheduler struct {
229	// root is the root of the priority tree, where root.id = 0.
230	// The root queues control frames that are not associated with any stream.
231	root priorityNode
232
233	// nodes maps stream ids to priority tree nodes.
234	nodes map[uint32]*priorityNode
235
236	// maxID is the maximum stream id in nodes.
237	maxID uint32
238
239	// lists of nodes that have been closed or are idle, but are kept in
240	// the tree for improved prioritization. When the lengths exceed either
241	// maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded.
242	closedNodes, idleNodes []*priorityNode
243
244	// From the config.
245	maxClosedNodesInTree int
246	maxIdleNodesInTree   int
247	writeThrottleLimit   int32
248	enableWriteThrottle  bool
249
250	// tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations.
251	tmp []*priorityNode
252
253	// pool of empty queues for reuse.
254	queuePool writeQueuePool
255}
256
257func (ws *priorityWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
258	// The stream may be currently idle but cannot be opened or closed.
259	if curr := ws.nodes[streamID]; curr != nil {
260		if curr.state != priorityNodeIdle {
261			panic(fmt.Sprintf("stream %d already opened", streamID))
262		}
263		curr.state = priorityNodeOpen
264		return
265	}
266
267	// RFC 7540, Section 5.3.5:
268	//  "All streams are initially assigned a non-exclusive dependency on stream 0x0.
269	//  Pushed streams initially depend on their associated stream. In both cases,
270	//  streams are assigned a default weight of 16."
271	parent := ws.nodes[options.PusherID]
272	if parent == nil {
273		parent = &ws.root
274	}
275	n := &priorityNode{
276		q:      *ws.queuePool.get(),
277		id:     streamID,
278		weight: priorityDefaultWeight,
279		state:  priorityNodeOpen,
280	}
281	n.setParent(parent)
282	ws.nodes[streamID] = n
283	if streamID > ws.maxID {
284		ws.maxID = streamID
285	}
286}
287
288func (ws *priorityWriteScheduler) CloseStream(streamID uint32) {
289	if streamID == 0 {
290		panic("violation of WriteScheduler interface: cannot close stream 0")
291	}
292	if ws.nodes[streamID] == nil {
293		panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
294	}
295	if ws.nodes[streamID].state != priorityNodeOpen {
296		panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
297	}
298
299	n := ws.nodes[streamID]
300	n.state = priorityNodeClosed
301	n.addBytes(-n.bytes)
302
303	q := n.q
304	ws.queuePool.put(&q)
305	n.q.s = nil
306	if ws.maxClosedNodesInTree > 0 {
307		ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
308	} else {
309		ws.removeNode(n)
310	}
311}
312
313func (ws *priorityWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
314	if streamID == 0 {
315		panic("adjustPriority on root")
316	}
317
318	// If streamID does not exist, there are two cases:
319	// - A closed stream that has been removed (this will have ID <= maxID)
320	// - An idle stream that is being used for "grouping" (this will have ID > maxID)
321	n := ws.nodes[streamID]
322	if n == nil {
323		if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
324			return
325		}
326		ws.maxID = streamID
327		n = &priorityNode{
328			q:      *ws.queuePool.get(),
329			id:     streamID,
330			weight: priorityDefaultWeight,
331			state:  priorityNodeIdle,
332		}
333		n.setParent(&ws.root)
334		ws.nodes[streamID] = n
335		ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
336	}
337
338	// Section 5.3.1: A dependency on a stream that is not currently in the tree
339	// results in that stream being given a default priority (Section 5.3.5).
340	parent := ws.nodes[priority.StreamDep]
341	if parent == nil {
342		n.setParent(&ws.root)
343		n.weight = priorityDefaultWeight
344		return
345	}
346
347	// Ignore if the client tries to make a node its own parent.
348	if n == parent {
349		return
350	}
351
352	// Section 5.3.3:
353	//   "If a stream is made dependent on one of its own dependencies, the
354	//   formerly dependent stream is first moved to be dependent on the
355	//   reprioritized stream's previous parent. The moved dependency retains
356	//   its weight."
357	//
358	// That is: if parent depends on n, move parent to depend on n.parent.
359	for x := parent.parent; x != nil; x = x.parent {
360		if x == n {
361			parent.setParent(n.parent)
362			break
363		}
364	}
365
366	// Section 5.3.3: The exclusive flag causes the stream to become the sole
367	// dependency of its parent stream, causing other dependencies to become
368	// dependent on the exclusive stream.
369	if priority.Exclusive {
370		k := parent.kids
371		for k != nil {
372			next := k.next
373			if k != n {
374				k.setParent(n)
375			}
376			k = next
377		}
378	}
379
380	n.setParent(parent)
381	n.weight = priority.Weight
382}
383
384func (ws *priorityWriteScheduler) Push(wr FrameWriteRequest) {
385	var n *priorityNode
386	if id := wr.StreamID(); id == 0 {
387		n = &ws.root
388	} else {
389		n = ws.nodes[id]
390		if n == nil {
391			// id is an idle or closed stream. wr should not be a HEADERS or
392			// DATA frame. However, wr can be a RST_STREAM. In this case, we
393			// push wr onto the root, rather than creating a new priorityNode,
394			// since RST_STREAM is tiny and the stream's priority is unknown
395			// anyway. See issue #17919.
396			if wr.DataSize() > 0 {
397				panic("add DATA on non-open stream")
398			}
399			n = &ws.root
400		}
401	}
402	n.q.push(wr)
403}
404
405func (ws *priorityWriteScheduler) Pop() (wr FrameWriteRequest, ok bool) {
406	ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNode, openParent bool) bool {
407		limit := int32(math.MaxInt32)
408		if openParent {
409			limit = ws.writeThrottleLimit
410		}
411		wr, ok = n.q.consume(limit)
412		if !ok {
413			return false
414		}
415		n.addBytes(int64(wr.DataSize()))
416		// If B depends on A and B continuously has data available but A
417		// does not, gradually increase the throttling limit to allow B to
418		// steal more and more bandwidth from A.
419		if openParent {
420			ws.writeThrottleLimit += 1024
421			if ws.writeThrottleLimit < 0 {
422				ws.writeThrottleLimit = math.MaxInt32
423			}
424		} else if ws.enableWriteThrottle {
425			ws.writeThrottleLimit = 1024
426		}
427		return true
428	})
429	return wr, ok
430}
431
432func (ws *priorityWriteScheduler) addClosedOrIdleNode(list *[]*priorityNode, maxSize int, n *priorityNode) {
433	if maxSize == 0 {
434		return
435	}
436	if len(*list) == maxSize {
437		// Remove the oldest node, then shift left.
438		ws.removeNode((*list)[0])
439		x := (*list)[1:]
440		copy(*list, x)
441		*list = (*list)[:len(x)]
442	}
443	*list = append(*list, n)
444}
445
446func (ws *priorityWriteScheduler) removeNode(n *priorityNode) {
447	for k := n.kids; k != nil; k = k.next {
448		k.setParent(n.parent)
449	}
450	n.setParent(nil)
451	delete(ws.nodes, n.id)
452}
453