1package quic 2 3import ( 4 "errors" 5 "sync" 6 7 "github.com/lucas-clemente/quic-go/internal/ackhandler" 8 "github.com/lucas-clemente/quic-go/internal/protocol" 9 "github.com/lucas-clemente/quic-go/internal/wire" 10 "github.com/lucas-clemente/quic-go/quicvarint" 11) 12 13type framer interface { 14 HasData() bool 15 16 QueueControlFrame(wire.Frame) 17 AppendControlFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) 18 19 AddActiveStream(protocol.StreamID) 20 AppendStreamFrames([]ackhandler.Frame, protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) 21 22 Handle0RTTRejection() error 23} 24 25type framerI struct { 26 mutex sync.Mutex 27 28 streamGetter streamGetter 29 version protocol.VersionNumber 30 31 activeStreams map[protocol.StreamID]struct{} 32 streamQueue []protocol.StreamID 33 34 controlFrameMutex sync.Mutex 35 controlFrames []wire.Frame 36} 37 38var _ framer = &framerI{} 39 40func newFramer( 41 streamGetter streamGetter, 42 v protocol.VersionNumber, 43) framer { 44 return &framerI{ 45 streamGetter: streamGetter, 46 activeStreams: make(map[protocol.StreamID]struct{}), 47 version: v, 48 } 49} 50 51func (f *framerI) HasData() bool { 52 f.mutex.Lock() 53 hasData := len(f.streamQueue) > 0 54 f.mutex.Unlock() 55 if hasData { 56 return true 57 } 58 f.controlFrameMutex.Lock() 59 hasData = len(f.controlFrames) > 0 60 f.controlFrameMutex.Unlock() 61 return hasData 62} 63 64func (f *framerI) QueueControlFrame(frame wire.Frame) { 65 f.controlFrameMutex.Lock() 66 f.controlFrames = append(f.controlFrames, frame) 67 f.controlFrameMutex.Unlock() 68} 69 70func (f *framerI) AppendControlFrames(frames []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { 71 var length protocol.ByteCount 72 f.controlFrameMutex.Lock() 73 for len(f.controlFrames) > 0 { 74 frame := f.controlFrames[len(f.controlFrames)-1] 75 frameLen := frame.Length(f.version) 76 if length+frameLen > maxLen { 77 break 78 } 79 frames = append(frames, ackhandler.Frame{Frame: frame}) 80 length += frameLen 81 f.controlFrames = f.controlFrames[:len(f.controlFrames)-1] 82 } 83 f.controlFrameMutex.Unlock() 84 return frames, length 85} 86 87func (f *framerI) AddActiveStream(id protocol.StreamID) { 88 f.mutex.Lock() 89 if _, ok := f.activeStreams[id]; !ok { 90 f.streamQueue = append(f.streamQueue, id) 91 f.activeStreams[id] = struct{}{} 92 } 93 f.mutex.Unlock() 94} 95 96func (f *framerI) AppendStreamFrames(frames []ackhandler.Frame, maxLen protocol.ByteCount) ([]ackhandler.Frame, protocol.ByteCount) { 97 var length protocol.ByteCount 98 var lastFrame *ackhandler.Frame 99 f.mutex.Lock() 100 // pop STREAM frames, until less than MinStreamFrameSize bytes are left in the packet 101 numActiveStreams := len(f.streamQueue) 102 for i := 0; i < numActiveStreams; i++ { 103 if protocol.MinStreamFrameSize+length > maxLen { 104 break 105 } 106 id := f.streamQueue[0] 107 f.streamQueue = f.streamQueue[1:] 108 // This should never return an error. Better check it anyway. 109 // The stream will only be in the streamQueue, if it enqueued itself there. 110 str, err := f.streamGetter.GetOrOpenSendStream(id) 111 // The stream can be nil if it completed after it said it had data. 112 if str == nil || err != nil { 113 delete(f.activeStreams, id) 114 continue 115 } 116 remainingLen := maxLen - length 117 // For the last STREAM frame, we'll remove the DataLen field later. 118 // Therefore, we can pretend to have more bytes available when popping 119 // the STREAM frame (which will always have the DataLen set). 120 remainingLen += quicvarint.Len(uint64(remainingLen)) 121 frame, hasMoreData := str.popStreamFrame(remainingLen) 122 if hasMoreData { // put the stream back in the queue (at the end) 123 f.streamQueue = append(f.streamQueue, id) 124 } else { // no more data to send. Stream is not active any more 125 delete(f.activeStreams, id) 126 } 127 // The frame can be nil 128 // * if the receiveStream was canceled after it said it had data 129 // * the remaining size doesn't allow us to add another STREAM frame 130 if frame == nil { 131 continue 132 } 133 frames = append(frames, *frame) 134 length += frame.Length(f.version) 135 lastFrame = frame 136 } 137 f.mutex.Unlock() 138 if lastFrame != nil { 139 lastFrameLen := lastFrame.Length(f.version) 140 // account for the smaller size of the last STREAM frame 141 lastFrame.Frame.(*wire.StreamFrame).DataLenPresent = false 142 length += lastFrame.Length(f.version) - lastFrameLen 143 } 144 return frames, length 145} 146 147func (f *framerI) Handle0RTTRejection() error { 148 f.mutex.Lock() 149 defer f.mutex.Unlock() 150 151 f.controlFrameMutex.Lock() 152 f.streamQueue = f.streamQueue[:0] 153 for id := range f.activeStreams { 154 delete(f.activeStreams, id) 155 } 156 var j int 157 for i, frame := range f.controlFrames { 158 switch frame.(type) { 159 case *wire.MaxDataFrame, *wire.MaxStreamDataFrame, *wire.MaxStreamsFrame: 160 return errors.New("didn't expect MAX_DATA / MAX_STREAM_DATA / MAX_STREAMS frame to be sent in 0-RTT") 161 case *wire.DataBlockedFrame, *wire.StreamDataBlockedFrame, *wire.StreamsBlockedFrame: 162 continue 163 default: 164 f.controlFrames[j] = f.controlFrames[i] 165 j++ 166 } 167 } 168 f.controlFrames = f.controlFrames[:j] 169 f.controlFrameMutex.Unlock() 170 return nil 171} 172