1// This file was automatically generated by genny. 2// Any changes will be lost if this file is regenerated. 3// see https://github.com/cheekybits/genny 4 5package quic 6 7import ( 8 "context" 9 "sync" 10 11 "github.com/lucas-clemente/quic-go/internal/protocol" 12 "github.com/lucas-clemente/quic-go/internal/wire" 13) 14 15type outgoingBidiStreamsMap struct { 16 mutex sync.RWMutex 17 18 streams map[protocol.StreamNum]streamI 19 20 openQueue map[uint64]chan struct{} 21 lowestInQueue uint64 22 highestInQueue uint64 23 24 nextStream protocol.StreamNum // stream ID of the stream returned by OpenStream(Sync) 25 maxStream protocol.StreamNum // the maximum stream ID we're allowed to open 26 blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream 27 28 newStream func(protocol.StreamNum) streamI 29 queueStreamIDBlocked func(*wire.StreamsBlockedFrame) 30 31 closeErr error 32} 33 34func newOutgoingBidiStreamsMap( 35 newStream func(protocol.StreamNum) streamI, 36 queueControlFrame func(wire.Frame), 37) *outgoingBidiStreamsMap { 38 return &outgoingBidiStreamsMap{ 39 streams: make(map[protocol.StreamNum]streamI), 40 openQueue: make(map[uint64]chan struct{}), 41 maxStream: protocol.InvalidStreamNum, 42 nextStream: 1, 43 newStream: newStream, 44 queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) }, 45 } 46} 47 48func (m *outgoingBidiStreamsMap) OpenStream() (streamI, error) { 49 m.mutex.Lock() 50 defer m.mutex.Unlock() 51 52 if m.closeErr != nil { 53 return nil, m.closeErr 54 } 55 56 // if there are OpenStreamSync calls waiting, return an error here 57 if len(m.openQueue) > 0 || m.nextStream > m.maxStream { 58 m.maybeSendBlockedFrame() 59 return nil, streamOpenErr{errTooManyOpenStreams} 60 } 61 return m.openStream(), nil 62} 63 64func (m *outgoingBidiStreamsMap) OpenStreamSync(ctx context.Context) (streamI, error) { 65 m.mutex.Lock() 66 defer m.mutex.Unlock() 67 68 if m.closeErr != nil { 69 return nil, m.closeErr 70 } 71 72 if err := ctx.Err(); err != nil { 73 return nil, err 74 } 75 76 if len(m.openQueue) == 0 && m.nextStream <= m.maxStream { 77 return m.openStream(), nil 78 } 79 80 waitChan := make(chan struct{}, 1) 81 queuePos := m.highestInQueue 82 m.highestInQueue++ 83 if len(m.openQueue) == 0 { 84 m.lowestInQueue = queuePos 85 } 86 m.openQueue[queuePos] = waitChan 87 m.maybeSendBlockedFrame() 88 89 for { 90 m.mutex.Unlock() 91 select { 92 case <-ctx.Done(): 93 m.mutex.Lock() 94 delete(m.openQueue, queuePos) 95 return nil, ctx.Err() 96 case <-waitChan: 97 } 98 m.mutex.Lock() 99 100 if m.closeErr != nil { 101 return nil, m.closeErr 102 } 103 if m.nextStream > m.maxStream { 104 // no stream available. Continue waiting 105 continue 106 } 107 str := m.openStream() 108 delete(m.openQueue, queuePos) 109 m.lowestInQueue = queuePos + 1 110 m.unblockOpenSync() 111 return str, nil 112 } 113} 114 115func (m *outgoingBidiStreamsMap) openStream() streamI { 116 s := m.newStream(m.nextStream) 117 m.streams[m.nextStream] = s 118 m.nextStream++ 119 return s 120} 121 122// maybeSendBlockedFrame queues a STREAMS_BLOCKED frame for the current stream offset, 123// if we haven't sent one for this offset yet 124func (m *outgoingBidiStreamsMap) maybeSendBlockedFrame() { 125 if m.blockedSent { 126 return 127 } 128 129 var streamNum protocol.StreamNum 130 if m.maxStream != protocol.InvalidStreamNum { 131 streamNum = m.maxStream 132 } 133 m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{ 134 Type: protocol.StreamTypeBidi, 135 StreamLimit: streamNum, 136 }) 137 m.blockedSent = true 138} 139 140func (m *outgoingBidiStreamsMap) GetStream(num protocol.StreamNum) (streamI, error) { 141 m.mutex.RLock() 142 if num >= m.nextStream { 143 m.mutex.RUnlock() 144 return nil, streamError{ 145 message: "peer attempted to open stream %d", 146 nums: []protocol.StreamNum{num}, 147 } 148 } 149 s := m.streams[num] 150 m.mutex.RUnlock() 151 return s, nil 152} 153 154func (m *outgoingBidiStreamsMap) DeleteStream(num protocol.StreamNum) error { 155 m.mutex.Lock() 156 defer m.mutex.Unlock() 157 158 if _, ok := m.streams[num]; !ok { 159 return streamError{ 160 message: "tried to delete unknown outgoing stream %d", 161 nums: []protocol.StreamNum{num}, 162 } 163 } 164 delete(m.streams, num) 165 return nil 166} 167 168func (m *outgoingBidiStreamsMap) SetMaxStream(num protocol.StreamNum) { 169 m.mutex.Lock() 170 defer m.mutex.Unlock() 171 172 if num <= m.maxStream { 173 return 174 } 175 m.maxStream = num 176 m.blockedSent = false 177 if m.maxStream < m.nextStream-1+protocol.StreamNum(len(m.openQueue)) { 178 m.maybeSendBlockedFrame() 179 } 180 m.unblockOpenSync() 181} 182 183// UpdateSendWindow is called when the peer's transport parameters are received. 184// Only in the case of a 0-RTT handshake will we have open streams at this point. 185// We might need to update the send window, in case the server increased it. 186func (m *outgoingBidiStreamsMap) UpdateSendWindow(limit protocol.ByteCount) { 187 m.mutex.Lock() 188 for _, str := range m.streams { 189 str.updateSendWindow(limit) 190 } 191 m.mutex.Unlock() 192} 193 194// unblockOpenSync unblocks the next OpenStreamSync go-routine to open a new stream 195func (m *outgoingBidiStreamsMap) unblockOpenSync() { 196 if len(m.openQueue) == 0 { 197 return 198 } 199 for qp := m.lowestInQueue; qp <= m.highestInQueue; qp++ { 200 c, ok := m.openQueue[qp] 201 if !ok { // entry was deleted because the context was canceled 202 continue 203 } 204 // unblockOpenSync is called both from OpenStreamSync and from SetMaxStream. 205 // It's sufficient to only unblock OpenStreamSync once. 206 select { 207 case c <- struct{}{}: 208 default: 209 } 210 return 211 } 212} 213 214func (m *outgoingBidiStreamsMap) CloseWithError(err error) { 215 m.mutex.Lock() 216 m.closeErr = err 217 for _, str := range m.streams { 218 str.closeForShutdown(err) 219 } 220 for _, c := range m.openQueue { 221 if c != nil { 222 close(c) 223 } 224 } 225 m.mutex.Unlock() 226} 227