1package quic 2 3import ( 4 "context" 5 "sync" 6 7 "github.com/lucas-clemente/quic-go/internal/protocol" 8 "github.com/lucas-clemente/quic-go/internal/wire" 9) 10 11// When a stream is deleted before it was accepted, we can't delete it from the map immediately. 12// We need to wait until the application accepts it, and delete it then. 13type itemEntry struct { 14 stream item 15 shouldDelete bool 16} 17 18//go:generate genny -in $GOFILE -out streams_map_incoming_bidi.go gen "item=streamI Item=BidiStream streamTypeGeneric=protocol.StreamTypeBidi" 19//go:generate genny -in $GOFILE -out streams_map_incoming_uni.go gen "item=receiveStreamI Item=UniStream streamTypeGeneric=protocol.StreamTypeUni" 20type incomingItemsMap struct { 21 mutex sync.RWMutex 22 newStreamChan chan struct{} 23 24 streams map[protocol.StreamNum]itemEntry 25 26 nextStreamToAccept protocol.StreamNum // the next stream that will be returned by AcceptStream() 27 nextStreamToOpen protocol.StreamNum // the highest stream that the peer opened 28 maxStream protocol.StreamNum // the highest stream that the peer is allowed to open 29 maxNumStreams uint64 // maximum number of streams 30 31 newStream func(protocol.StreamNum) item 32 queueMaxStreamID func(*wire.MaxStreamsFrame) 33 34 closeErr error 35} 36 37func newIncomingItemsMap( 38 newStream func(protocol.StreamNum) item, 39 maxStreams uint64, 40 queueControlFrame func(wire.Frame), 41) *incomingItemsMap { 42 return &incomingItemsMap{ 43 newStreamChan: make(chan struct{}, 1), 44 streams: make(map[protocol.StreamNum]itemEntry), 45 maxStream: protocol.StreamNum(maxStreams), 46 maxNumStreams: maxStreams, 47 newStream: newStream, 48 nextStreamToOpen: 1, 49 nextStreamToAccept: 1, 50 queueMaxStreamID: func(f *wire.MaxStreamsFrame) { queueControlFrame(f) }, 51 } 52} 53 54func (m *incomingItemsMap) AcceptStream(ctx context.Context) (item, error) { 55 // drain the newStreamChan, so we don't check the map twice if the stream doesn't exist 56 select { 57 case <-m.newStreamChan: 58 default: 59 } 60 61 m.mutex.Lock() 62 63 var num protocol.StreamNum 64 var entry itemEntry 65 for { 66 num = m.nextStreamToAccept 67 if m.closeErr != nil { 68 m.mutex.Unlock() 69 return nil, m.closeErr 70 } 71 var ok bool 72 entry, ok = m.streams[num] 73 if ok { 74 break 75 } 76 m.mutex.Unlock() 77 select { 78 case <-ctx.Done(): 79 return nil, ctx.Err() 80 case <-m.newStreamChan: 81 } 82 m.mutex.Lock() 83 } 84 m.nextStreamToAccept++ 85 // If this stream was completed before being accepted, we can delete it now. 86 if entry.shouldDelete { 87 if err := m.deleteStream(num); err != nil { 88 m.mutex.Unlock() 89 return nil, err 90 } 91 } 92 m.mutex.Unlock() 93 return entry.stream, nil 94} 95 96func (m *incomingItemsMap) GetOrOpenStream(num protocol.StreamNum) (item, error) { 97 m.mutex.RLock() 98 if num > m.maxStream { 99 m.mutex.RUnlock() 100 return nil, streamError{ 101 message: "peer tried to open stream %d (current limit: %d)", 102 nums: []protocol.StreamNum{num, m.maxStream}, 103 } 104 } 105 // if the num is smaller than the highest we accepted 106 // * this stream exists in the map, and we can return it, or 107 // * this stream was already closed, then we can return the nil 108 if num < m.nextStreamToOpen { 109 var s item 110 // If the stream was already queued for deletion, and is just waiting to be accepted, don't return it. 111 if entry, ok := m.streams[num]; ok && !entry.shouldDelete { 112 s = entry.stream 113 } 114 m.mutex.RUnlock() 115 return s, nil 116 } 117 m.mutex.RUnlock() 118 119 m.mutex.Lock() 120 // no need to check the two error conditions from above again 121 // * maxStream can only increase, so if the id was valid before, it definitely is valid now 122 // * highestStream is only modified by this function 123 for newNum := m.nextStreamToOpen; newNum <= num; newNum++ { 124 m.streams[newNum] = itemEntry{stream: m.newStream(newNum)} 125 select { 126 case m.newStreamChan <- struct{}{}: 127 default: 128 } 129 } 130 m.nextStreamToOpen = num + 1 131 entry := m.streams[num] 132 m.mutex.Unlock() 133 return entry.stream, nil 134} 135 136func (m *incomingItemsMap) DeleteStream(num protocol.StreamNum) error { 137 m.mutex.Lock() 138 defer m.mutex.Unlock() 139 140 return m.deleteStream(num) 141} 142 143func (m *incomingItemsMap) deleteStream(num protocol.StreamNum) error { 144 if _, ok := m.streams[num]; !ok { 145 return streamError{ 146 message: "tried to delete unknown incoming stream %d", 147 nums: []protocol.StreamNum{num}, 148 } 149 } 150 151 // Don't delete this stream yet, if it was not yet accepted. 152 // Just save it to streamsToDelete map, to make sure it is deleted as soon as it gets accepted. 153 if num >= m.nextStreamToAccept { 154 entry, ok := m.streams[num] 155 if ok && entry.shouldDelete { 156 return streamError{ 157 message: "tried to delete incoming stream %d multiple times", 158 nums: []protocol.StreamNum{num}, 159 } 160 } 161 entry.shouldDelete = true 162 m.streams[num] = entry // can't assign to struct in map, so we need to reassign 163 return nil 164 } 165 166 delete(m.streams, num) 167 // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream 168 if m.maxNumStreams > uint64(len(m.streams)) { 169 maxStream := m.nextStreamToOpen + protocol.StreamNum(m.maxNumStreams-uint64(len(m.streams))) - 1 170 // Never send a value larger than protocol.MaxStreamCount. 171 if maxStream <= protocol.MaxStreamCount { 172 m.maxStream = maxStream 173 m.queueMaxStreamID(&wire.MaxStreamsFrame{ 174 Type: streamTypeGeneric, 175 MaxStreamNum: m.maxStream, 176 }) 177 } 178 } 179 return nil 180} 181 182func (m *incomingItemsMap) CloseWithError(err error) { 183 m.mutex.Lock() 184 m.closeErr = err 185 for _, entry := range m.streams { 186 entry.stream.closeForShutdown(err) 187 } 188 m.mutex.Unlock() 189 close(m.newStreamChan) 190} 191