1// This file was automatically generated by genny. 2// Any changes will be lost if this file is regenerated. 3// see https://github.com/cheekybits/genny 4 5package quic 6 7import ( 8 "context" 9 "sync" 10 11 "github.com/lucas-clemente/quic-go/internal/protocol" 12 "github.com/lucas-clemente/quic-go/internal/wire" 13) 14 15// When a stream is deleted before it was accepted, we can't delete it from the map immediately. 16// We need to wait until the application accepts it, and delete it then. 17type receiveStreamIEntry struct { 18 stream receiveStreamI 19 shouldDelete bool 20} 21 22type incomingUniStreamsMap struct { 23 mutex sync.RWMutex 24 newStreamChan chan struct{} 25 26 streams map[protocol.StreamNum]receiveStreamIEntry 27 28 nextStreamToAccept protocol.StreamNum // the next stream that will be returned by AcceptStream() 29 nextStreamToOpen protocol.StreamNum // the highest stream that the peer opened 30 maxStream protocol.StreamNum // the highest stream that the peer is allowed to open 31 maxNumStreams uint64 // maximum number of streams 32 33 newStream func(protocol.StreamNum) receiveStreamI 34 queueMaxStreamID func(*wire.MaxStreamsFrame) 35 36 closeErr error 37} 38 39func newIncomingUniStreamsMap( 40 newStream func(protocol.StreamNum) receiveStreamI, 41 maxStreams uint64, 42 queueControlFrame func(wire.Frame), 43) *incomingUniStreamsMap { 44 return &incomingUniStreamsMap{ 45 newStreamChan: make(chan struct{}, 1), 46 streams: make(map[protocol.StreamNum]receiveStreamIEntry), 47 maxStream: protocol.StreamNum(maxStreams), 48 maxNumStreams: maxStreams, 49 newStream: newStream, 50 nextStreamToOpen: 1, 51 nextStreamToAccept: 1, 52 queueMaxStreamID: func(f *wire.MaxStreamsFrame) { queueControlFrame(f) }, 53 } 54} 55 56func (m *incomingUniStreamsMap) AcceptStream(ctx context.Context) (receiveStreamI, error) { 57 // drain the newStreamChan, so we don't check the map twice if the stream doesn't exist 58 select { 59 case <-m.newStreamChan: 60 default: 61 } 62 63 m.mutex.Lock() 64 65 var num protocol.StreamNum 66 var entry receiveStreamIEntry 67 for { 68 num = m.nextStreamToAccept 69 if m.closeErr != nil { 70 m.mutex.Unlock() 71 return nil, m.closeErr 72 } 73 var ok bool 74 entry, ok = m.streams[num] 75 if ok { 76 break 77 } 78 m.mutex.Unlock() 79 select { 80 case <-ctx.Done(): 81 return nil, ctx.Err() 82 case <-m.newStreamChan: 83 } 84 m.mutex.Lock() 85 } 86 m.nextStreamToAccept++ 87 // If this stream was completed before being accepted, we can delete it now. 88 if entry.shouldDelete { 89 if err := m.deleteStream(num); err != nil { 90 m.mutex.Unlock() 91 return nil, err 92 } 93 } 94 m.mutex.Unlock() 95 return entry.stream, nil 96} 97 98func (m *incomingUniStreamsMap) GetOrOpenStream(num protocol.StreamNum) (receiveStreamI, error) { 99 m.mutex.RLock() 100 if num > m.maxStream { 101 m.mutex.RUnlock() 102 return nil, streamError{ 103 message: "peer tried to open stream %d (current limit: %d)", 104 nums: []protocol.StreamNum{num, m.maxStream}, 105 } 106 } 107 // if the num is smaller than the highest we accepted 108 // * this stream exists in the map, and we can return it, or 109 // * this stream was already closed, then we can return the nil 110 if num < m.nextStreamToOpen { 111 var s receiveStreamI 112 // If the stream was already queued for deletion, and is just waiting to be accepted, don't return it. 113 if entry, ok := m.streams[num]; ok && !entry.shouldDelete { 114 s = entry.stream 115 } 116 m.mutex.RUnlock() 117 return s, nil 118 } 119 m.mutex.RUnlock() 120 121 m.mutex.Lock() 122 // no need to check the two error conditions from above again 123 // * maxStream can only increase, so if the id was valid before, it definitely is valid now 124 // * highestStream is only modified by this function 125 for newNum := m.nextStreamToOpen; newNum <= num; newNum++ { 126 m.streams[newNum] = receiveStreamIEntry{stream: m.newStream(newNum)} 127 select { 128 case m.newStreamChan <- struct{}{}: 129 default: 130 } 131 } 132 m.nextStreamToOpen = num + 1 133 entry := m.streams[num] 134 m.mutex.Unlock() 135 return entry.stream, nil 136} 137 138func (m *incomingUniStreamsMap) DeleteStream(num protocol.StreamNum) error { 139 m.mutex.Lock() 140 defer m.mutex.Unlock() 141 142 return m.deleteStream(num) 143} 144 145func (m *incomingUniStreamsMap) deleteStream(num protocol.StreamNum) error { 146 if _, ok := m.streams[num]; !ok { 147 return streamError{ 148 message: "tried to delete unknown incoming stream %d", 149 nums: []protocol.StreamNum{num}, 150 } 151 } 152 153 // Don't delete this stream yet, if it was not yet accepted. 154 // Just save it to streamsToDelete map, to make sure it is deleted as soon as it gets accepted. 155 if num >= m.nextStreamToAccept { 156 entry, ok := m.streams[num] 157 if ok && entry.shouldDelete { 158 return streamError{ 159 message: "tried to delete incoming stream %d multiple times", 160 nums: []protocol.StreamNum{num}, 161 } 162 } 163 entry.shouldDelete = true 164 m.streams[num] = entry // can't assign to struct in map, so we need to reassign 165 return nil 166 } 167 168 delete(m.streams, num) 169 // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream 170 if m.maxNumStreams > uint64(len(m.streams)) { 171 maxStream := m.nextStreamToOpen + protocol.StreamNum(m.maxNumStreams-uint64(len(m.streams))) - 1 172 // Never send a value larger than protocol.MaxStreamCount. 173 if maxStream <= protocol.MaxStreamCount { 174 m.maxStream = maxStream 175 m.queueMaxStreamID(&wire.MaxStreamsFrame{ 176 Type: protocol.StreamTypeUni, 177 MaxStreamNum: m.maxStream, 178 }) 179 } 180 } 181 return nil 182} 183 184func (m *incomingUniStreamsMap) CloseWithError(err error) { 185 m.mutex.Lock() 186 m.closeErr = err 187 for _, entry := range m.streams { 188 entry.stream.closeForShutdown(err) 189 } 190 m.mutex.Unlock() 191 close(m.newStreamChan) 192} 193