1package quic 2 3import ( 4 "fmt" 5 "io" 6 "sync" 7 "time" 8 9 "github.com/lucas-clemente/quic-go/internal/flowcontrol" 10 "github.com/lucas-clemente/quic-go/internal/protocol" 11 "github.com/lucas-clemente/quic-go/internal/qerr" 12 "github.com/lucas-clemente/quic-go/internal/utils" 13 "github.com/lucas-clemente/quic-go/internal/wire" 14) 15 16type receiveStreamI interface { 17 ReceiveStream 18 19 handleStreamFrame(*wire.StreamFrame) error 20 handleResetStreamFrame(*wire.ResetStreamFrame) error 21 closeForShutdown(error) 22 getWindowUpdate() protocol.ByteCount 23} 24 25type receiveStream struct { 26 mutex sync.Mutex 27 28 streamID protocol.StreamID 29 30 sender streamSender 31 32 frameQueue *frameSorter 33 finalOffset protocol.ByteCount 34 35 currentFrame []byte 36 currentFrameDone func() 37 currentFrameIsLast bool // is the currentFrame the last frame on this stream 38 readPosInFrame int 39 40 closeForShutdownErr error 41 cancelReadErr error 42 resetRemotelyErr *StreamError 43 44 closedForShutdown bool // set when CloseForShutdown() is called 45 finRead bool // set once we read a frame with a Fin 46 canceledRead bool // set when CancelRead() is called 47 resetRemotely bool // set when HandleResetStreamFrame() is called 48 49 readChan chan struct{} 50 deadline time.Time 51 52 flowController flowcontrol.StreamFlowController 53 version protocol.VersionNumber 54} 55 56var ( 57 _ ReceiveStream = &receiveStream{} 58 _ receiveStreamI = &receiveStream{} 59) 60 61func newReceiveStream( 62 streamID protocol.StreamID, 63 sender streamSender, 64 flowController flowcontrol.StreamFlowController, 65 version protocol.VersionNumber, 66) *receiveStream { 67 return &receiveStream{ 68 streamID: streamID, 69 sender: sender, 70 flowController: flowController, 71 frameQueue: newFrameSorter(), 72 readChan: make(chan struct{}, 1), 73 finalOffset: protocol.MaxByteCount, 74 version: version, 75 } 76} 77 78func (s *receiveStream) StreamID() protocol.StreamID { 79 return s.streamID 80} 81 82// Read implements io.Reader. It is not thread safe! 83func (s *receiveStream) Read(p []byte) (int, error) { 84 s.mutex.Lock() 85 completed, n, err := s.readImpl(p) 86 s.mutex.Unlock() 87 88 if completed { 89 s.sender.onStreamCompleted(s.streamID) 90 } 91 return n, err 92} 93 94func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) { 95 if s.finRead { 96 return false, 0, io.EOF 97 } 98 if s.canceledRead { 99 return false, 0, s.cancelReadErr 100 } 101 if s.resetRemotely { 102 return false, 0, s.resetRemotelyErr 103 } 104 if s.closedForShutdown { 105 return false, 0, s.closeForShutdownErr 106 } 107 108 bytesRead := 0 109 var deadlineTimer *utils.Timer 110 for bytesRead < len(p) { 111 if s.currentFrame == nil || s.readPosInFrame >= len(s.currentFrame) { 112 s.dequeueNextFrame() 113 } 114 if s.currentFrame == nil && bytesRead > 0 { 115 return false, bytesRead, s.closeForShutdownErr 116 } 117 118 for { 119 // Stop waiting on errors 120 if s.closedForShutdown { 121 return false, bytesRead, s.closeForShutdownErr 122 } 123 if s.canceledRead { 124 return false, bytesRead, s.cancelReadErr 125 } 126 if s.resetRemotely { 127 return false, bytesRead, s.resetRemotelyErr 128 } 129 130 deadline := s.deadline 131 if !deadline.IsZero() { 132 if !time.Now().Before(deadline) { 133 return false, bytesRead, errDeadline 134 } 135 if deadlineTimer == nil { 136 deadlineTimer = utils.NewTimer() 137 defer deadlineTimer.Stop() 138 } 139 deadlineTimer.Reset(deadline) 140 } 141 142 if s.currentFrame != nil || s.currentFrameIsLast { 143 break 144 } 145 146 s.mutex.Unlock() 147 if deadline.IsZero() { 148 <-s.readChan 149 } else { 150 select { 151 case <-s.readChan: 152 case <-deadlineTimer.Chan(): 153 deadlineTimer.SetRead() 154 } 155 } 156 s.mutex.Lock() 157 if s.currentFrame == nil { 158 s.dequeueNextFrame() 159 } 160 } 161 162 if bytesRead > len(p) { 163 return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p)) 164 } 165 if s.readPosInFrame > len(s.currentFrame) { 166 return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame)) 167 } 168 169 s.mutex.Unlock() 170 171 m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:]) 172 s.readPosInFrame += m 173 bytesRead += m 174 175 s.mutex.Lock() 176 // when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream 177 if !s.resetRemotely { 178 s.flowController.AddBytesRead(protocol.ByteCount(m)) 179 } 180 181 if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast { 182 s.finRead = true 183 return true, bytesRead, io.EOF 184 } 185 } 186 return false, bytesRead, nil 187} 188 189func (s *receiveStream) dequeueNextFrame() { 190 var offset protocol.ByteCount 191 // We're done with the last frame. Release the buffer. 192 if s.currentFrameDone != nil { 193 s.currentFrameDone() 194 } 195 offset, s.currentFrame, s.currentFrameDone = s.frameQueue.Pop() 196 s.currentFrameIsLast = offset+protocol.ByteCount(len(s.currentFrame)) >= s.finalOffset 197 s.readPosInFrame = 0 198} 199 200func (s *receiveStream) CancelRead(errorCode StreamErrorCode) { 201 s.mutex.Lock() 202 completed := s.cancelReadImpl(errorCode) 203 s.mutex.Unlock() 204 205 if completed { 206 s.flowController.Abandon() 207 s.sender.onStreamCompleted(s.streamID) 208 } 209} 210 211func (s *receiveStream) cancelReadImpl(errorCode qerr.StreamErrorCode) bool /* completed */ { 212 if s.finRead || s.canceledRead || s.resetRemotely { 213 return false 214 } 215 s.canceledRead = true 216 s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode) 217 s.signalRead() 218 s.sender.queueControlFrame(&wire.StopSendingFrame{ 219 StreamID: s.streamID, 220 ErrorCode: errorCode, 221 }) 222 // We're done with this stream if the final offset was already received. 223 return s.finalOffset != protocol.MaxByteCount 224} 225 226func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error { 227 s.mutex.Lock() 228 completed, err := s.handleStreamFrameImpl(frame) 229 s.mutex.Unlock() 230 231 if completed { 232 s.flowController.Abandon() 233 s.sender.onStreamCompleted(s.streamID) 234 } 235 return err 236} 237 238func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) { 239 maxOffset := frame.Offset + frame.DataLen() 240 if err := s.flowController.UpdateHighestReceived(maxOffset, frame.Fin); err != nil { 241 return false, err 242 } 243 var newlyRcvdFinalOffset bool 244 if frame.Fin { 245 newlyRcvdFinalOffset = s.finalOffset == protocol.MaxByteCount 246 s.finalOffset = maxOffset 247 } 248 if s.canceledRead { 249 return newlyRcvdFinalOffset, nil 250 } 251 if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.PutBack); err != nil { 252 return false, err 253 } 254 s.signalRead() 255 return false, nil 256} 257 258func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error { 259 s.mutex.Lock() 260 completed, err := s.handleResetStreamFrameImpl(frame) 261 s.mutex.Unlock() 262 263 if completed { 264 s.flowController.Abandon() 265 s.sender.onStreamCompleted(s.streamID) 266 } 267 return err 268} 269 270func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) { 271 if s.closedForShutdown { 272 return false, nil 273 } 274 if err := s.flowController.UpdateHighestReceived(frame.FinalSize, true); err != nil { 275 return false, err 276 } 277 newlyRcvdFinalOffset := s.finalOffset == protocol.MaxByteCount 278 s.finalOffset = frame.FinalSize 279 280 // ignore duplicate RESET_STREAM frames for this stream (after checking their final offset) 281 if s.resetRemotely { 282 return false, nil 283 } 284 s.resetRemotely = true 285 s.resetRemotelyErr = &StreamError{ 286 StreamID: s.streamID, 287 ErrorCode: frame.ErrorCode, 288 } 289 s.signalRead() 290 return newlyRcvdFinalOffset, nil 291} 292 293func (s *receiveStream) CloseRemote(offset protocol.ByteCount) { 294 s.handleStreamFrame(&wire.StreamFrame{Fin: true, Offset: offset}) 295} 296 297func (s *receiveStream) SetReadDeadline(t time.Time) error { 298 s.mutex.Lock() 299 s.deadline = t 300 s.mutex.Unlock() 301 s.signalRead() 302 return nil 303} 304 305// CloseForShutdown closes a stream abruptly. 306// It makes Read unblock (and return the error) immediately. 307// The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET. 308func (s *receiveStream) closeForShutdown(err error) { 309 s.mutex.Lock() 310 s.closedForShutdown = true 311 s.closeForShutdownErr = err 312 s.mutex.Unlock() 313 s.signalRead() 314} 315 316func (s *receiveStream) getWindowUpdate() protocol.ByteCount { 317 return s.flowController.GetWindowUpdate() 318} 319 320// signalRead performs a non-blocking send on the readChan 321func (s *receiveStream) signalRead() { 322 select { 323 case s.readChan <- struct{}{}: 324 default: 325 } 326} 327