1package quic
2
3import (
4	"fmt"
5	"io"
6	"sync"
7	"time"
8
9	"github.com/lucas-clemente/quic-go/internal/flowcontrol"
10	"github.com/lucas-clemente/quic-go/internal/protocol"
11	"github.com/lucas-clemente/quic-go/internal/qerr"
12	"github.com/lucas-clemente/quic-go/internal/utils"
13	"github.com/lucas-clemente/quic-go/internal/wire"
14)
15
16type receiveStreamI interface {
17	ReceiveStream
18
19	handleStreamFrame(*wire.StreamFrame) error
20	handleResetStreamFrame(*wire.ResetStreamFrame) error
21	closeForShutdown(error)
22	getWindowUpdate() protocol.ByteCount
23}
24
25type receiveStream struct {
26	mutex sync.Mutex
27
28	streamID protocol.StreamID
29
30	sender streamSender
31
32	frameQueue  *frameSorter
33	finalOffset protocol.ByteCount
34
35	currentFrame       []byte
36	currentFrameDone   func()
37	currentFrameIsLast bool // is the currentFrame the last frame on this stream
38	readPosInFrame     int
39
40	closeForShutdownErr error
41	cancelReadErr       error
42	resetRemotelyErr    *StreamError
43
44	closedForShutdown bool // set when CloseForShutdown() is called
45	finRead           bool // set once we read a frame with a Fin
46	canceledRead      bool // set when CancelRead() is called
47	resetRemotely     bool // set when HandleResetStreamFrame() is called
48
49	readChan chan struct{}
50	deadline time.Time
51
52	flowController flowcontrol.StreamFlowController
53	version        protocol.VersionNumber
54}
55
56var (
57	_ ReceiveStream  = &receiveStream{}
58	_ receiveStreamI = &receiveStream{}
59)
60
61func newReceiveStream(
62	streamID protocol.StreamID,
63	sender streamSender,
64	flowController flowcontrol.StreamFlowController,
65	version protocol.VersionNumber,
66) *receiveStream {
67	return &receiveStream{
68		streamID:       streamID,
69		sender:         sender,
70		flowController: flowController,
71		frameQueue:     newFrameSorter(),
72		readChan:       make(chan struct{}, 1),
73		finalOffset:    protocol.MaxByteCount,
74		version:        version,
75	}
76}
77
78func (s *receiveStream) StreamID() protocol.StreamID {
79	return s.streamID
80}
81
82// Read implements io.Reader. It is not thread safe!
83func (s *receiveStream) Read(p []byte) (int, error) {
84	s.mutex.Lock()
85	completed, n, err := s.readImpl(p)
86	s.mutex.Unlock()
87
88	if completed {
89		s.sender.onStreamCompleted(s.streamID)
90	}
91	return n, err
92}
93
94func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) {
95	if s.finRead {
96		return false, 0, io.EOF
97	}
98	if s.canceledRead {
99		return false, 0, s.cancelReadErr
100	}
101	if s.resetRemotely {
102		return false, 0, s.resetRemotelyErr
103	}
104	if s.closedForShutdown {
105		return false, 0, s.closeForShutdownErr
106	}
107
108	bytesRead := 0
109	var deadlineTimer *utils.Timer
110	for bytesRead < len(p) {
111		if s.currentFrame == nil || s.readPosInFrame >= len(s.currentFrame) {
112			s.dequeueNextFrame()
113		}
114		if s.currentFrame == nil && bytesRead > 0 {
115			return false, bytesRead, s.closeForShutdownErr
116		}
117
118		for {
119			// Stop waiting on errors
120			if s.closedForShutdown {
121				return false, bytesRead, s.closeForShutdownErr
122			}
123			if s.canceledRead {
124				return false, bytesRead, s.cancelReadErr
125			}
126			if s.resetRemotely {
127				return false, bytesRead, s.resetRemotelyErr
128			}
129
130			deadline := s.deadline
131			if !deadline.IsZero() {
132				if !time.Now().Before(deadline) {
133					return false, bytesRead, errDeadline
134				}
135				if deadlineTimer == nil {
136					deadlineTimer = utils.NewTimer()
137					defer deadlineTimer.Stop()
138				}
139				deadlineTimer.Reset(deadline)
140			}
141
142			if s.currentFrame != nil || s.currentFrameIsLast {
143				break
144			}
145
146			s.mutex.Unlock()
147			if deadline.IsZero() {
148				<-s.readChan
149			} else {
150				select {
151				case <-s.readChan:
152				case <-deadlineTimer.Chan():
153					deadlineTimer.SetRead()
154				}
155			}
156			s.mutex.Lock()
157			if s.currentFrame == nil {
158				s.dequeueNextFrame()
159			}
160		}
161
162		if bytesRead > len(p) {
163			return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
164		}
165		if s.readPosInFrame > len(s.currentFrame) {
166			return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
167		}
168
169		s.mutex.Unlock()
170
171		m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
172		s.readPosInFrame += m
173		bytesRead += m
174
175		s.mutex.Lock()
176		// when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream
177		if !s.resetRemotely {
178			s.flowController.AddBytesRead(protocol.ByteCount(m))
179		}
180
181		if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
182			s.finRead = true
183			return true, bytesRead, io.EOF
184		}
185	}
186	return false, bytesRead, nil
187}
188
189func (s *receiveStream) dequeueNextFrame() {
190	var offset protocol.ByteCount
191	// We're done with the last frame. Release the buffer.
192	if s.currentFrameDone != nil {
193		s.currentFrameDone()
194	}
195	offset, s.currentFrame, s.currentFrameDone = s.frameQueue.Pop()
196	s.currentFrameIsLast = offset+protocol.ByteCount(len(s.currentFrame)) >= s.finalOffset
197	s.readPosInFrame = 0
198}
199
200func (s *receiveStream) CancelRead(errorCode StreamErrorCode) {
201	s.mutex.Lock()
202	completed := s.cancelReadImpl(errorCode)
203	s.mutex.Unlock()
204
205	if completed {
206		s.flowController.Abandon()
207		s.sender.onStreamCompleted(s.streamID)
208	}
209}
210
211func (s *receiveStream) cancelReadImpl(errorCode qerr.StreamErrorCode) bool /* completed */ {
212	if s.finRead || s.canceledRead || s.resetRemotely {
213		return false
214	}
215	s.canceledRead = true
216	s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode)
217	s.signalRead()
218	s.sender.queueControlFrame(&wire.StopSendingFrame{
219		StreamID:  s.streamID,
220		ErrorCode: errorCode,
221	})
222	// We're done with this stream if the final offset was already received.
223	return s.finalOffset != protocol.MaxByteCount
224}
225
226func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
227	s.mutex.Lock()
228	completed, err := s.handleStreamFrameImpl(frame)
229	s.mutex.Unlock()
230
231	if completed {
232		s.flowController.Abandon()
233		s.sender.onStreamCompleted(s.streamID)
234	}
235	return err
236}
237
238func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) {
239	maxOffset := frame.Offset + frame.DataLen()
240	if err := s.flowController.UpdateHighestReceived(maxOffset, frame.Fin); err != nil {
241		return false, err
242	}
243	var newlyRcvdFinalOffset bool
244	if frame.Fin {
245		newlyRcvdFinalOffset = s.finalOffset == protocol.MaxByteCount
246		s.finalOffset = maxOffset
247	}
248	if s.canceledRead {
249		return newlyRcvdFinalOffset, nil
250	}
251	if err := s.frameQueue.Push(frame.Data, frame.Offset, frame.PutBack); err != nil {
252		return false, err
253	}
254	s.signalRead()
255	return false, nil
256}
257
258func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
259	s.mutex.Lock()
260	completed, err := s.handleResetStreamFrameImpl(frame)
261	s.mutex.Unlock()
262
263	if completed {
264		s.flowController.Abandon()
265		s.sender.onStreamCompleted(s.streamID)
266	}
267	return err
268}
269
270func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) {
271	if s.closedForShutdown {
272		return false, nil
273	}
274	if err := s.flowController.UpdateHighestReceived(frame.FinalSize, true); err != nil {
275		return false, err
276	}
277	newlyRcvdFinalOffset := s.finalOffset == protocol.MaxByteCount
278	s.finalOffset = frame.FinalSize
279
280	// ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
281	if s.resetRemotely {
282		return false, nil
283	}
284	s.resetRemotely = true
285	s.resetRemotelyErr = &StreamError{
286		StreamID:  s.streamID,
287		ErrorCode: frame.ErrorCode,
288	}
289	s.signalRead()
290	return newlyRcvdFinalOffset, nil
291}
292
293func (s *receiveStream) CloseRemote(offset protocol.ByteCount) {
294	s.handleStreamFrame(&wire.StreamFrame{Fin: true, Offset: offset})
295}
296
297func (s *receiveStream) SetReadDeadline(t time.Time) error {
298	s.mutex.Lock()
299	s.deadline = t
300	s.mutex.Unlock()
301	s.signalRead()
302	return nil
303}
304
305// CloseForShutdown closes a stream abruptly.
306// It makes Read unblock (and return the error) immediately.
307// The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET.
308func (s *receiveStream) closeForShutdown(err error) {
309	s.mutex.Lock()
310	s.closedForShutdown = true
311	s.closeForShutdownErr = err
312	s.mutex.Unlock()
313	s.signalRead()
314}
315
316func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
317	return s.flowController.GetWindowUpdate()
318}
319
320// signalRead performs a non-blocking send on the readChan
321func (s *receiveStream) signalRead() {
322	select {
323	case s.readChan <- struct{}{}:
324	default:
325	}
326}
327