1package quic
2
3import (
4	"sync"
5
6	"github.com/lucas-clemente/quic-go/internal/flowcontrol"
7	"github.com/lucas-clemente/quic-go/internal/protocol"
8	"github.com/lucas-clemente/quic-go/internal/wire"
9)
10
11type windowUpdateQueue struct {
12	mutex sync.Mutex
13
14	queue      map[protocol.StreamID]struct{} // used as a set
15	queuedConn bool                           // connection-level window update
16
17	streamGetter       streamGetter
18	connFlowController flowcontrol.ConnectionFlowController
19	callback           func(wire.Frame)
20}
21
22func newWindowUpdateQueue(
23	streamGetter streamGetter,
24	connFC flowcontrol.ConnectionFlowController,
25	cb func(wire.Frame),
26) *windowUpdateQueue {
27	return &windowUpdateQueue{
28		queue:              make(map[protocol.StreamID]struct{}),
29		streamGetter:       streamGetter,
30		connFlowController: connFC,
31		callback:           cb,
32	}
33}
34
35func (q *windowUpdateQueue) AddStream(id protocol.StreamID) {
36	q.mutex.Lock()
37	q.queue[id] = struct{}{}
38	q.mutex.Unlock()
39}
40
41func (q *windowUpdateQueue) AddConnection() {
42	q.mutex.Lock()
43	q.queuedConn = true
44	q.mutex.Unlock()
45}
46
47func (q *windowUpdateQueue) QueueAll() {
48	q.mutex.Lock()
49	// queue a connection-level window update
50	if q.queuedConn {
51		q.callback(&wire.MaxDataFrame{MaximumData: q.connFlowController.GetWindowUpdate()})
52		q.queuedConn = false
53	}
54	// queue all stream-level window updates
55	for id := range q.queue {
56		delete(q.queue, id)
57		str, err := q.streamGetter.GetOrOpenReceiveStream(id)
58		if err != nil || str == nil { // the stream can be nil if it was completed before dequeing the window update
59			continue
60		}
61		offset := str.getWindowUpdate()
62		if offset == 0 { // can happen if we received a final offset, right after queueing the window update
63			continue
64		}
65		q.callback(&wire.MaxStreamDataFrame{
66			StreamID:          id,
67			MaximumStreamData: offset,
68		})
69	}
70	q.mutex.Unlock()
71}
72