1package quic
2
3import (
4	"context"
5	"sync"
6
7	"github.com/lucas-clemente/quic-go/internal/protocol"
8	"github.com/lucas-clemente/quic-go/internal/wire"
9)
10
11// When a stream is deleted before it was accepted, we can't delete it from the map immediately.
12// We need to wait until the application accepts it, and delete it then.
13type itemEntry struct {
14	stream       item
15	shouldDelete bool
16}
17
18//go:generate genny -in $GOFILE -out streams_map_incoming_bidi.go gen "item=streamI Item=BidiStream streamTypeGeneric=protocol.StreamTypeBidi"
19//go:generate genny -in $GOFILE -out streams_map_incoming_uni.go gen "item=receiveStreamI Item=UniStream streamTypeGeneric=protocol.StreamTypeUni"
20type incomingItemsMap struct {
21	mutex         sync.RWMutex
22	newStreamChan chan struct{}
23
24	streams map[protocol.StreamNum]itemEntry
25
26	nextStreamToAccept protocol.StreamNum // the next stream that will be returned by AcceptStream()
27	nextStreamToOpen   protocol.StreamNum // the highest stream that the peer opened
28	maxStream          protocol.StreamNum // the highest stream that the peer is allowed to open
29	maxNumStreams      uint64             // maximum number of streams
30
31	newStream        func(protocol.StreamNum) item
32	queueMaxStreamID func(*wire.MaxStreamsFrame)
33
34	closeErr error
35}
36
37func newIncomingItemsMap(
38	newStream func(protocol.StreamNum) item,
39	maxStreams uint64,
40	queueControlFrame func(wire.Frame),
41) *incomingItemsMap {
42	return &incomingItemsMap{
43		newStreamChan:      make(chan struct{}, 1),
44		streams:            make(map[protocol.StreamNum]itemEntry),
45		maxStream:          protocol.StreamNum(maxStreams),
46		maxNumStreams:      maxStreams,
47		newStream:          newStream,
48		nextStreamToOpen:   1,
49		nextStreamToAccept: 1,
50		queueMaxStreamID:   func(f *wire.MaxStreamsFrame) { queueControlFrame(f) },
51	}
52}
53
54func (m *incomingItemsMap) AcceptStream(ctx context.Context) (item, error) {
55	// drain the newStreamChan, so we don't check the map twice if the stream doesn't exist
56	select {
57	case <-m.newStreamChan:
58	default:
59	}
60
61	m.mutex.Lock()
62
63	var num protocol.StreamNum
64	var entry itemEntry
65	for {
66		num = m.nextStreamToAccept
67		if m.closeErr != nil {
68			m.mutex.Unlock()
69			return nil, m.closeErr
70		}
71		var ok bool
72		entry, ok = m.streams[num]
73		if ok {
74			break
75		}
76		m.mutex.Unlock()
77		select {
78		case <-ctx.Done():
79			return nil, ctx.Err()
80		case <-m.newStreamChan:
81		}
82		m.mutex.Lock()
83	}
84	m.nextStreamToAccept++
85	// If this stream was completed before being accepted, we can delete it now.
86	if entry.shouldDelete {
87		if err := m.deleteStream(num); err != nil {
88			m.mutex.Unlock()
89			return nil, err
90		}
91	}
92	m.mutex.Unlock()
93	return entry.stream, nil
94}
95
96func (m *incomingItemsMap) GetOrOpenStream(num protocol.StreamNum) (item, error) {
97	m.mutex.RLock()
98	if num > m.maxStream {
99		m.mutex.RUnlock()
100		return nil, streamError{
101			message: "peer tried to open stream %d (current limit: %d)",
102			nums:    []protocol.StreamNum{num, m.maxStream},
103		}
104	}
105	// if the num is smaller than the highest we accepted
106	// * this stream exists in the map, and we can return it, or
107	// * this stream was already closed, then we can return the nil
108	if num < m.nextStreamToOpen {
109		var s item
110		// If the stream was already queued for deletion, and is just waiting to be accepted, don't return it.
111		if entry, ok := m.streams[num]; ok && !entry.shouldDelete {
112			s = entry.stream
113		}
114		m.mutex.RUnlock()
115		return s, nil
116	}
117	m.mutex.RUnlock()
118
119	m.mutex.Lock()
120	// no need to check the two error conditions from above again
121	// * maxStream can only increase, so if the id was valid before, it definitely is valid now
122	// * highestStream is only modified by this function
123	for newNum := m.nextStreamToOpen; newNum <= num; newNum++ {
124		m.streams[newNum] = itemEntry{stream: m.newStream(newNum)}
125		select {
126		case m.newStreamChan <- struct{}{}:
127		default:
128		}
129	}
130	m.nextStreamToOpen = num + 1
131	entry := m.streams[num]
132	m.mutex.Unlock()
133	return entry.stream, nil
134}
135
136func (m *incomingItemsMap) DeleteStream(num protocol.StreamNum) error {
137	m.mutex.Lock()
138	defer m.mutex.Unlock()
139
140	return m.deleteStream(num)
141}
142
143func (m *incomingItemsMap) deleteStream(num protocol.StreamNum) error {
144	if _, ok := m.streams[num]; !ok {
145		return streamError{
146			message: "tried to delete unknown incoming stream %d",
147			nums:    []protocol.StreamNum{num},
148		}
149	}
150
151	// Don't delete this stream yet, if it was not yet accepted.
152	// Just save it to streamsToDelete map, to make sure it is deleted as soon as it gets accepted.
153	if num >= m.nextStreamToAccept {
154		entry, ok := m.streams[num]
155		if ok && entry.shouldDelete {
156			return streamError{
157				message: "tried to delete incoming stream %d multiple times",
158				nums:    []protocol.StreamNum{num},
159			}
160		}
161		entry.shouldDelete = true
162		m.streams[num] = entry // can't assign to struct in map, so we need to reassign
163		return nil
164	}
165
166	delete(m.streams, num)
167	// queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream
168	if m.maxNumStreams > uint64(len(m.streams)) {
169		maxStream := m.nextStreamToOpen + protocol.StreamNum(m.maxNumStreams-uint64(len(m.streams))) - 1
170		// Never send a value larger than protocol.MaxStreamCount.
171		if maxStream <= protocol.MaxStreamCount {
172			m.maxStream = maxStream
173			m.queueMaxStreamID(&wire.MaxStreamsFrame{
174				Type:         streamTypeGeneric,
175				MaxStreamNum: m.maxStream,
176			})
177		}
178	}
179	return nil
180}
181
182func (m *incomingItemsMap) CloseWithError(err error) {
183	m.mutex.Lock()
184	m.closeErr = err
185	for _, entry := range m.streams {
186		entry.stream.closeForShutdown(err)
187	}
188	m.mutex.Unlock()
189	close(m.newStreamChan)
190}
191