1// This file was automatically generated by genny.
2// Any changes will be lost if this file is regenerated.
3// see https://github.com/cheekybits/genny
4
5package quic
6
7import (
8	"context"
9	"sync"
10
11	"github.com/lucas-clemente/quic-go/internal/protocol"
12	"github.com/lucas-clemente/quic-go/internal/wire"
13)
14
15type incomingBidiStreamsMap struct {
16	mutex         sync.RWMutex
17	newStreamChan chan struct{}
18
19	streams map[protocol.StreamNum]streamI
20	// When a stream is deleted before it was accepted, we can't delete it immediately.
21	// We need to wait until the application accepts it, and delete it immediately then.
22	streamsToDelete map[protocol.StreamNum]struct{} // used as a set
23
24	nextStreamToAccept protocol.StreamNum // the next stream that will be returned by AcceptStream()
25	nextStreamToOpen   protocol.StreamNum // the highest stream that the peer opened
26	maxStream          protocol.StreamNum // the highest stream that the peer is allowed to open
27	maxNumStreams      uint64             // maximum number of streams
28
29	newStream        func(protocol.StreamNum) streamI
30	queueMaxStreamID func(*wire.MaxStreamsFrame)
31
32	closeErr error
33}
34
35func newIncomingBidiStreamsMap(
36	newStream func(protocol.StreamNum) streamI,
37	maxStreams uint64,
38	queueControlFrame func(wire.Frame),
39) *incomingBidiStreamsMap {
40	return &incomingBidiStreamsMap{
41		newStreamChan:      make(chan struct{}, 1),
42		streams:            make(map[protocol.StreamNum]streamI),
43		streamsToDelete:    make(map[protocol.StreamNum]struct{}),
44		maxStream:          protocol.StreamNum(maxStreams),
45		maxNumStreams:      maxStreams,
46		newStream:          newStream,
47		nextStreamToOpen:   1,
48		nextStreamToAccept: 1,
49		queueMaxStreamID:   func(f *wire.MaxStreamsFrame) { queueControlFrame(f) },
50	}
51}
52
53func (m *incomingBidiStreamsMap) AcceptStream(ctx context.Context) (streamI, error) {
54	// drain the newStreamChan, so we don't check the map twice if the stream doesn't exist
55	select {
56	case <-m.newStreamChan:
57	default:
58	}
59
60	m.mutex.Lock()
61
62	var num protocol.StreamNum
63	var str streamI
64	for {
65		num = m.nextStreamToAccept
66		if m.closeErr != nil {
67			m.mutex.Unlock()
68			return nil, m.closeErr
69		}
70		var ok bool
71		str, ok = m.streams[num]
72		if ok {
73			break
74		}
75		m.mutex.Unlock()
76		select {
77		case <-ctx.Done():
78			return nil, ctx.Err()
79		case <-m.newStreamChan:
80		}
81		m.mutex.Lock()
82	}
83	m.nextStreamToAccept++
84	// If this stream was completed before being accepted, we can delete it now.
85	if _, ok := m.streamsToDelete[num]; ok {
86		delete(m.streamsToDelete, num)
87		if err := m.deleteStream(num); err != nil {
88			m.mutex.Unlock()
89			return nil, err
90		}
91	}
92	m.mutex.Unlock()
93	return str, nil
94}
95
96func (m *incomingBidiStreamsMap) GetOrOpenStream(num protocol.StreamNum) (streamI, error) {
97	m.mutex.RLock()
98	if num > m.maxStream {
99		m.mutex.RUnlock()
100		return nil, streamError{
101			message: "peer tried to open stream %d (current limit: %d)",
102			nums:    []protocol.StreamNum{num, m.maxStream},
103		}
104	}
105	// if the num is smaller than the highest we accepted
106	// * this stream exists in the map, and we can return it, or
107	// * this stream was already closed, then we can return the nil
108	if num < m.nextStreamToOpen {
109		var s streamI
110		// If the stream was already queued for deletion, and is just waiting to be accepted, don't return it.
111		if _, ok := m.streamsToDelete[num]; !ok {
112			s = m.streams[num]
113		}
114		m.mutex.RUnlock()
115		return s, nil
116	}
117	m.mutex.RUnlock()
118
119	m.mutex.Lock()
120	// no need to check the two error conditions from above again
121	// * maxStream can only increase, so if the id was valid before, it definitely is valid now
122	// * highestStream is only modified by this function
123	for newNum := m.nextStreamToOpen; newNum <= num; newNum++ {
124		m.streams[newNum] = m.newStream(newNum)
125		select {
126		case m.newStreamChan <- struct{}{}:
127		default:
128		}
129	}
130	m.nextStreamToOpen = num + 1
131	s := m.streams[num]
132	m.mutex.Unlock()
133	return s, nil
134}
135
136func (m *incomingBidiStreamsMap) DeleteStream(num protocol.StreamNum) error {
137	m.mutex.Lock()
138	defer m.mutex.Unlock()
139
140	return m.deleteStream(num)
141}
142
143func (m *incomingBidiStreamsMap) deleteStream(num protocol.StreamNum) error {
144	if _, ok := m.streams[num]; !ok {
145		return streamError{
146			message: "Tried to delete unknown incoming stream %d",
147			nums:    []protocol.StreamNum{num},
148		}
149	}
150
151	// Don't delete this stream yet, if it was not yet accepted.
152	// Just save it to streamsToDelete map, to make sure it is deleted as soon as it gets accepted.
153	if num >= m.nextStreamToAccept {
154		if _, ok := m.streamsToDelete[num]; ok {
155			return streamError{
156				message: "Tried to delete incoming stream %d multiple times",
157				nums:    []protocol.StreamNum{num},
158			}
159		}
160		m.streamsToDelete[num] = struct{}{}
161		return nil
162	}
163
164	delete(m.streams, num)
165	// queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream
166	if m.maxNumStreams > uint64(len(m.streams)) {
167		maxStream := m.nextStreamToOpen + protocol.StreamNum(m.maxNumStreams-uint64(len(m.streams))) - 1
168		// Never send a value larger than protocol.MaxStreamCount.
169		if maxStream <= protocol.MaxStreamCount {
170			m.maxStream = maxStream
171			m.queueMaxStreamID(&wire.MaxStreamsFrame{
172				Type:         protocol.StreamTypeBidi,
173				MaxStreamNum: m.maxStream,
174			})
175		}
176	}
177	return nil
178}
179
180func (m *incomingBidiStreamsMap) CloseWithError(err error) {
181	m.mutex.Lock()
182	m.closeErr = err
183	for _, str := range m.streams {
184		str.closeForShutdown(err)
185	}
186	m.mutex.Unlock()
187	close(m.newStreamChan)
188}
189