1// This file was automatically generated by genny. 2// Any changes will be lost if this file is regenerated. 3// see https://github.com/ooni/psiphon/oopsi/github.com/cheekybits/genny 4 5package quic 6 7import ( 8 "context" 9 "sync" 10 11 "github.com/ooni/psiphon/oopsi/github.com/Psiphon-Labs/quic-go/internal/protocol" 12 "github.com/ooni/psiphon/oopsi/github.com/Psiphon-Labs/quic-go/internal/wire" 13) 14 15type incomingUniStreamsMap struct { 16 mutex sync.RWMutex 17 newStreamChan chan struct{} 18 19 streams map[protocol.StreamNum]receiveStreamI 20 // When a stream is deleted before it was accepted, we can't delete it immediately. 21 // We need to wait until the application accepts it, and delete it immediately then. 22 streamsToDelete map[protocol.StreamNum]struct{} // used as a set 23 24 nextStreamToAccept protocol.StreamNum // the next stream that will be returned by AcceptStream() 25 nextStreamToOpen protocol.StreamNum // the highest stream that the peer openend 26 maxStream protocol.StreamNum // the highest stream that the peer is allowed to open 27 maxNumStreams uint64 // maximum number of streams 28 29 newStream func(protocol.StreamNum) receiveStreamI 30 queueMaxStreamID func(*wire.MaxStreamsFrame) 31 // streamNumToID func(protocol.StreamNum) protocol.StreamID // only used for generating errors 32 33 closeErr error 34} 35 36func newIncomingUniStreamsMap( 37 newStream func(protocol.StreamNum) receiveStreamI, 38 maxStreams uint64, 39 queueControlFrame func(wire.Frame), 40) *incomingUniStreamsMap { 41 return &incomingUniStreamsMap{ 42 newStreamChan: make(chan struct{}), 43 streams: make(map[protocol.StreamNum]receiveStreamI), 44 streamsToDelete: make(map[protocol.StreamNum]struct{}), 45 maxStream: protocol.StreamNum(maxStreams), 46 maxNumStreams: maxStreams, 47 newStream: newStream, 48 nextStreamToOpen: 1, 49 nextStreamToAccept: 1, 50 queueMaxStreamID: func(f *wire.MaxStreamsFrame) { queueControlFrame(f) }, 51 } 52} 53 54func (m *incomingUniStreamsMap) AcceptStream(ctx context.Context) (receiveStreamI, error) { 55 m.mutex.Lock() 56 57 var num protocol.StreamNum 58 var str receiveStreamI 59 for { 60 num = m.nextStreamToAccept 61 if m.closeErr != nil { 62 m.mutex.Unlock() 63 return nil, m.closeErr 64 } 65 var ok bool 66 str, ok = m.streams[num] 67 if ok { 68 break 69 } 70 m.mutex.Unlock() 71 select { 72 case <-ctx.Done(): 73 return nil, ctx.Err() 74 case <-m.newStreamChan: 75 } 76 m.mutex.Lock() 77 } 78 m.nextStreamToAccept++ 79 // If this stream was completed before being accepted, we can delete it now. 80 if _, ok := m.streamsToDelete[num]; ok { 81 delete(m.streamsToDelete, num) 82 if err := m.deleteStream(num); err != nil { 83 m.mutex.Unlock() 84 return nil, err 85 } 86 } 87 m.mutex.Unlock() 88 return str, nil 89} 90 91func (m *incomingUniStreamsMap) GetOrOpenStream(num protocol.StreamNum) (receiveStreamI, error) { 92 m.mutex.RLock() 93 if num > m.maxStream { 94 m.mutex.RUnlock() 95 return nil, streamError{ 96 message: "peer tried to open stream %d (current limit: %d)", 97 nums: []protocol.StreamNum{num, m.maxStream}, 98 } 99 } 100 // if the num is smaller than the highest we accepted 101 // * this stream exists in the map, and we can return it, or 102 // * this stream was already closed, then we can return the nil 103 if num < m.nextStreamToOpen { 104 var s receiveStreamI 105 // If the stream was already queued for deletion, and is just waiting to be accepted, don't return it. 106 if _, ok := m.streamsToDelete[num]; !ok { 107 s = m.streams[num] 108 } 109 m.mutex.RUnlock() 110 return s, nil 111 } 112 m.mutex.RUnlock() 113 114 m.mutex.Lock() 115 // no need to check the two error conditions from above again 116 // * maxStream can only increase, so if the id was valid before, it definitely is valid now 117 // * highestStream is only modified by this function 118 for newNum := m.nextStreamToOpen; newNum <= num; newNum++ { 119 m.streams[newNum] = m.newStream(newNum) 120 select { 121 case m.newStreamChan <- struct{}{}: 122 default: 123 } 124 } 125 m.nextStreamToOpen = num + 1 126 s := m.streams[num] 127 m.mutex.Unlock() 128 return s, nil 129} 130 131func (m *incomingUniStreamsMap) DeleteStream(num protocol.StreamNum) error { 132 m.mutex.Lock() 133 defer m.mutex.Unlock() 134 135 return m.deleteStream(num) 136} 137 138func (m *incomingUniStreamsMap) deleteStream(num protocol.StreamNum) error { 139 if _, ok := m.streams[num]; !ok { 140 return streamError{ 141 message: "Tried to delete unknown incoming stream %d", 142 nums: []protocol.StreamNum{num}, 143 } 144 } 145 146 // Don't delete this stream yet, if it was not yet accepted. 147 // Just save it to streamsToDelete map, to make sure it is deleted as soon as it gets accepted. 148 if num >= m.nextStreamToAccept { 149 if _, ok := m.streamsToDelete[num]; ok { 150 return streamError{ 151 message: "Tried to delete incoming stream %d multiple times", 152 nums: []protocol.StreamNum{num}, 153 } 154 } 155 m.streamsToDelete[num] = struct{}{} 156 return nil 157 } 158 159 delete(m.streams, num) 160 // queue a MAX_STREAM_ID frame, giving the peer the option to open a new stream 161 if m.maxNumStreams > uint64(len(m.streams)) { 162 numNewStreams := m.maxNumStreams - uint64(len(m.streams)) 163 m.maxStream = m.nextStreamToOpen + protocol.StreamNum(numNewStreams) - 1 164 m.queueMaxStreamID(&wire.MaxStreamsFrame{ 165 Type: protocol.StreamTypeUni, 166 MaxStreamNum: m.maxStream, 167 }) 168 } 169 return nil 170} 171 172func (m *incomingUniStreamsMap) CloseWithError(err error) { 173 m.mutex.Lock() 174 m.closeErr = err 175 for _, str := range m.streams { 176 str.closeForShutdown(err) 177 } 178 m.mutex.Unlock() 179 close(m.newStreamChan) 180} 181