1// This file was automatically generated by genny.
2// Any changes will be lost if this file is regenerated.
3// see https://github.com/cheekybits/genny
4
5package quic
6
7import (
8	"context"
9	"sync"
10
11	"github.com/lucas-clemente/quic-go/internal/protocol"
12	"github.com/lucas-clemente/quic-go/internal/wire"
13)
14
15// When a stream is deleted before it was accepted, we can't delete it from the map immediately.
16// We need to wait until the application accepts it, and delete it then.
17type receiveStreamIEntry struct {
18	stream       receiveStreamI
19	shouldDelete bool
20}
21
22type incomingUniStreamsMap struct {
23	mutex         sync.RWMutex
24	newStreamChan chan struct{}
25
26	streams map[protocol.StreamNum]receiveStreamIEntry
27
28	nextStreamToAccept protocol.StreamNum // the next stream that will be returned by AcceptStream()
29	nextStreamToOpen   protocol.StreamNum // the highest stream that the peer opened
30	maxStream          protocol.StreamNum // the highest stream that the peer is allowed to open
31	maxNumStreams      uint64             // maximum number of streams
32
33	newStream        func(protocol.StreamNum) receiveStreamI
34	queueMaxStreamID func(*wire.MaxStreamsFrame)
35
36	closeErr error
37}
38
39func newIncomingUniStreamsMap(
40	newStream func(protocol.StreamNum) receiveStreamI,
41	maxStreams uint64,
42	queueControlFrame func(wire.Frame),
43) *incomingUniStreamsMap {
44	return &incomingUniStreamsMap{
45		newStreamChan:      make(chan struct{}, 1),
46		streams:            make(map[protocol.StreamNum]receiveStreamIEntry),
47		maxStream:          protocol.StreamNum(maxStreams),
48		maxNumStreams:      maxStreams,
49		newStream:          newStream,
50		nextStreamToOpen:   1,
51		nextStreamToAccept: 1,
52		queueMaxStreamID:   func(f *wire.MaxStreamsFrame) { queueControlFrame(f) },
53	}
54}
55
56func (m *incomingUniStreamsMap) AcceptStream(ctx context.Context) (receiveStreamI, error) {
57	// drain the newStreamChan, so we don't check the map twice if the stream doesn't exist
58	select {
59	case <-m.newStreamChan:
60	default:
61	}
62
63	m.mutex.Lock()
64
65	var num protocol.StreamNum
66	var entry receiveStreamIEntry
67	for {
68		num = m.nextStreamToAccept
69		if m.closeErr != nil {
70			m.mutex.Unlock()
71			return nil, m.closeErr
72		}
73		var ok bool
74		entry, ok = m.streams[num]
75		if ok {
76			break
77		}
78		m.mutex.Unlock()
79		select {
80		case <-ctx.Done():
81			return nil, ctx.Err()
82		case <-m.newStreamChan:
83		}
84		m.mutex.Lock()
85	}
86	m.nextStreamToAccept++
87	// If this stream was completed before being accepted, we can delete it now.
88	if entry.shouldDelete {
89		if err := m.deleteStream(num); err != nil {
90			m.mutex.Unlock()
91			return nil, err
92		}
93	}
94	m.mutex.Unlock()
95	return entry.stream, nil
96}
97
98func (m *incomingUniStreamsMap) GetOrOpenStream(num protocol.StreamNum) (receiveStreamI, error) {
99	m.mutex.RLock()
100	if num > m.maxStream {
101		m.mutex.RUnlock()
102		return nil, streamError{
103			message: "peer tried to open stream %d (current limit: %d)",
104			nums:    []protocol.StreamNum{num, m.maxStream},
105		}
106	}
107	// if the num is smaller than the highest we accepted
108	// * this stream exists in the map, and we can return it, or
109	// * this stream was already closed, then we can return the nil
110	if num < m.nextStreamToOpen {
111		var s receiveStreamI
112		// If the stream was already queued for deletion, and is just waiting to be accepted, don't return it.
113		if entry, ok := m.streams[num]; ok && !entry.shouldDelete {
114			s = entry.stream
115		}
116		m.mutex.RUnlock()
117		return s, nil
118	}
119	m.mutex.RUnlock()
120
121	m.mutex.Lock()
122	// no need to check the two error conditions from above again
123	// * maxStream can only increase, so if the id was valid before, it definitely is valid now
124	// * highestStream is only modified by this function
125	for newNum := m.nextStreamToOpen; newNum <= num; newNum++ {
126		m.streams[newNum] = receiveStreamIEntry{stream: m.newStream(newNum)}
127		select {
128		case m.newStreamChan <- struct{}{}:
129		default:
130		}
131	}
132	m.nextStreamToOpen = num + 1
133	entry := m.streams[num]
134	m.mutex.Unlock()
135	return entry.stream, nil
136}
137
138func (m *incomingUniStreamsMap) DeleteStream(num protocol.StreamNum) error {
139	m.mutex.Lock()
140	defer m.mutex.Unlock()
141
142	return m.deleteStream(num)
143}
144
145func (m *incomingUniStreamsMap) deleteStream(num protocol.StreamNum) error {
146	if _, ok := m.streams[num]; !ok {
147		return streamError{
148			message: "tried to delete unknown incoming stream %d",
149			nums:    []protocol.StreamNum{num},
150		}
151	}
152
153	// Don't delete this stream yet, if it was not yet accepted.
154	// Just save it to streamsToDelete map, to make sure it is deleted as soon as it gets accepted.
155	if num >= m.nextStreamToAccept {
156		entry, ok := m.streams[num]
157		if ok && entry.shouldDelete {
158			return streamError{
159				message: "tried to delete incoming stream %d multiple times",
160				nums:    []protocol.StreamNum{num},
161			}
162		}
163		entry.shouldDelete = true
164		m.streams[num] = entry // can't assign to struct in map, so we need to reassign
165		return nil
166	}
167
168	delete(m.streams, num)
169	// queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream
170	if m.maxNumStreams > uint64(len(m.streams)) {
171		maxStream := m.nextStreamToOpen + protocol.StreamNum(m.maxNumStreams-uint64(len(m.streams))) - 1
172		// Never send a value larger than protocol.MaxStreamCount.
173		if maxStream <= protocol.MaxStreamCount {
174			m.maxStream = maxStream
175			m.queueMaxStreamID(&wire.MaxStreamsFrame{
176				Type:         protocol.StreamTypeUni,
177				MaxStreamNum: m.maxStream,
178			})
179		}
180	}
181	return nil
182}
183
184func (m *incomingUniStreamsMap) CloseWithError(err error) {
185	m.mutex.Lock()
186	m.closeErr = err
187	for _, entry := range m.streams {
188		entry.stream.closeForShutdown(err)
189	}
190	m.mutex.Unlock()
191	close(m.newStreamChan)
192}
193