1package quic
2
3import (
4	"errors"
5	"io"
6	"runtime"
7	"time"
8
9	"github.com/golang/mock/gomock"
10	"github.com/lucas-clemente/quic-go/internal/mocks"
11	"github.com/lucas-clemente/quic-go/internal/protocol"
12	"github.com/lucas-clemente/quic-go/internal/wire"
13
14	. "github.com/onsi/ginkgo"
15	. "github.com/onsi/gomega"
16	"github.com/onsi/gomega/gbytes"
17)
18
19var _ = Describe("Receive Stream", func() {
20	const streamID protocol.StreamID = 1337
21
22	var (
23		str            *receiveStream
24		strWithTimeout io.Reader // str wrapped with gbytes.TimeoutReader
25		mockFC         *mocks.MockStreamFlowController
26		mockSender     *MockStreamSender
27	)
28
29	BeforeEach(func() {
30		mockSender = NewMockStreamSender(mockCtrl)
31		mockFC = mocks.NewMockStreamFlowController(mockCtrl)
32		str = newReceiveStream(streamID, mockSender, mockFC, protocol.VersionWhatever)
33
34		timeout := scaleDuration(250 * time.Millisecond)
35		strWithTimeout = gbytes.TimeoutReader(str, timeout)
36	})
37
38	It("gets stream id", func() {
39		Expect(str.StreamID()).To(Equal(protocol.StreamID(1337)))
40	})
41
42	Context("reading", func() {
43		It("reads a single STREAM frame", func() {
44			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
45			mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
46			frame := wire.StreamFrame{
47				Offset: 0,
48				Data:   []byte{0xDE, 0xAD, 0xBE, 0xEF},
49			}
50			err := str.handleStreamFrame(&frame)
51			Expect(err).ToNot(HaveOccurred())
52			b := make([]byte, 4)
53			n, err := strWithTimeout.Read(b)
54			Expect(err).ToNot(HaveOccurred())
55			Expect(n).To(Equal(4))
56			Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
57		})
58
59		It("reads a single STREAM frame in multiple goes", func() {
60			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
61			mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
62			mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
63			frame := wire.StreamFrame{
64				Offset: 0,
65				Data:   []byte{0xDE, 0xAD, 0xBE, 0xEF},
66			}
67			err := str.handleStreamFrame(&frame)
68			Expect(err).ToNot(HaveOccurred())
69			b := make([]byte, 2)
70			n, err := strWithTimeout.Read(b)
71			Expect(err).ToNot(HaveOccurred())
72			Expect(n).To(Equal(2))
73			Expect(b).To(Equal([]byte{0xDE, 0xAD}))
74			n, err = strWithTimeout.Read(b)
75			Expect(err).ToNot(HaveOccurred())
76			Expect(n).To(Equal(2))
77			Expect(b).To(Equal([]byte{0xBE, 0xEF}))
78		})
79
80		It("reads all data available", func() {
81			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
82			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
83			mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
84			frame1 := wire.StreamFrame{
85				Offset: 0,
86				Data:   []byte{0xDE, 0xAD},
87			}
88			frame2 := wire.StreamFrame{
89				Offset: 2,
90				Data:   []byte{0xBE, 0xEF},
91			}
92			err := str.handleStreamFrame(&frame1)
93			Expect(err).ToNot(HaveOccurred())
94			err = str.handleStreamFrame(&frame2)
95			Expect(err).ToNot(HaveOccurred())
96			b := make([]byte, 6)
97			n, err := strWithTimeout.Read(b)
98			Expect(err).ToNot(HaveOccurred())
99			Expect(n).To(Equal(4))
100			Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00}))
101		})
102
103		It("assembles multiple STREAM frames", func() {
104			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
105			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
106			mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
107			frame1 := wire.StreamFrame{
108				Offset: 0,
109				Data:   []byte{0xDE, 0xAD},
110			}
111			frame2 := wire.StreamFrame{
112				Offset: 2,
113				Data:   []byte{0xBE, 0xEF},
114			}
115			err := str.handleStreamFrame(&frame1)
116			Expect(err).ToNot(HaveOccurred())
117			err = str.handleStreamFrame(&frame2)
118			Expect(err).ToNot(HaveOccurred())
119			b := make([]byte, 4)
120			n, err := strWithTimeout.Read(b)
121			Expect(err).ToNot(HaveOccurred())
122			Expect(n).To(Equal(4))
123			Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
124		})
125
126		It("waits until data is available", func() {
127			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
128			mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
129			go func() {
130				defer GinkgoRecover()
131				frame := wire.StreamFrame{Data: []byte{0xDE, 0xAD}}
132				time.Sleep(10 * time.Millisecond)
133				err := str.handleStreamFrame(&frame)
134				Expect(err).ToNot(HaveOccurred())
135			}()
136			b := make([]byte, 2)
137			n, err := strWithTimeout.Read(b)
138			Expect(err).ToNot(HaveOccurred())
139			Expect(n).To(Equal(2))
140		})
141
142		It("handles STREAM frames in wrong order", func() {
143			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
144			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
145			mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
146			frame1 := wire.StreamFrame{
147				Offset: 2,
148				Data:   []byte{0xBE, 0xEF},
149			}
150			frame2 := wire.StreamFrame{
151				Offset: 0,
152				Data:   []byte{0xDE, 0xAD},
153			}
154			err := str.handleStreamFrame(&frame1)
155			Expect(err).ToNot(HaveOccurred())
156			err = str.handleStreamFrame(&frame2)
157			Expect(err).ToNot(HaveOccurred())
158			b := make([]byte, 4)
159			n, err := strWithTimeout.Read(b)
160			Expect(err).ToNot(HaveOccurred())
161			Expect(n).To(Equal(4))
162			Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
163		})
164
165		It("ignores duplicate STREAM frames", func() {
166			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
167			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
168			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
169			mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
170			frame1 := wire.StreamFrame{
171				Offset: 0,
172				Data:   []byte{0xDE, 0xAD},
173			}
174			frame2 := wire.StreamFrame{
175				Offset: 0,
176				Data:   []byte{0x13, 0x37},
177			}
178			frame3 := wire.StreamFrame{
179				Offset: 2,
180				Data:   []byte{0xBE, 0xEF},
181			}
182			err := str.handleStreamFrame(&frame1)
183			Expect(err).ToNot(HaveOccurred())
184			err = str.handleStreamFrame(&frame2)
185			Expect(err).ToNot(HaveOccurred())
186			err = str.handleStreamFrame(&frame3)
187			Expect(err).ToNot(HaveOccurred())
188			b := make([]byte, 4)
189			n, err := strWithTimeout.Read(b)
190			Expect(err).ToNot(HaveOccurred())
191			Expect(n).To(Equal(4))
192			Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
193		})
194
195		It("doesn't rejects a STREAM frames with an overlapping data range", func() {
196			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
197			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false)
198			mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
199			mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
200			frame1 := wire.StreamFrame{
201				Offset: 0,
202				Data:   []byte("foob"),
203			}
204			frame2 := wire.StreamFrame{
205				Offset: 2,
206				Data:   []byte("obar"),
207			}
208			err := str.handleStreamFrame(&frame1)
209			Expect(err).ToNot(HaveOccurred())
210			err = str.handleStreamFrame(&frame2)
211			Expect(err).ToNot(HaveOccurred())
212			b := make([]byte, 6)
213			n, err := strWithTimeout.Read(b)
214			Expect(err).ToNot(HaveOccurred())
215			Expect(n).To(Equal(6))
216			Expect(b).To(Equal([]byte("foobar")))
217		})
218
219		Context("deadlines", func() {
220			It("the deadline error has the right net.Error properties", func() {
221				Expect(errDeadline.Temporary()).To(BeTrue())
222				Expect(errDeadline.Timeout()).To(BeTrue())
223				Expect(errDeadline).To(MatchError("deadline exceeded"))
224			})
225
226			It("returns an error when Read is called after the deadline", func() {
227				mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false).AnyTimes()
228				f := &wire.StreamFrame{Data: []byte("foobar")}
229				err := str.handleStreamFrame(f)
230				Expect(err).ToNot(HaveOccurred())
231				str.SetReadDeadline(time.Now().Add(-time.Second))
232				b := make([]byte, 6)
233				n, err := strWithTimeout.Read(b)
234				Expect(err).To(MatchError(errDeadline))
235				Expect(n).To(BeZero())
236			})
237
238			It("unblocks when the deadline is changed to the past", func() {
239				str.SetReadDeadline(time.Now().Add(time.Hour))
240				done := make(chan struct{})
241				go func() {
242					defer GinkgoRecover()
243					_, err := str.Read(make([]byte, 6))
244					Expect(err).To(MatchError(errDeadline))
245					close(done)
246				}()
247				Consistently(done).ShouldNot(BeClosed())
248				str.SetReadDeadline(time.Now().Add(-time.Hour))
249				Eventually(done).Should(BeClosed())
250			})
251
252			It("unblocks after the deadline", func() {
253				deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
254				str.SetReadDeadline(deadline)
255				b := make([]byte, 6)
256				n, err := strWithTimeout.Read(b)
257				Expect(err).To(MatchError(errDeadline))
258				Expect(n).To(BeZero())
259				Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond)))
260			})
261
262			It("doesn't unblock if the deadline is changed before the first one expires", func() {
263				deadline1 := time.Now().Add(scaleDuration(50 * time.Millisecond))
264				deadline2 := time.Now().Add(scaleDuration(100 * time.Millisecond))
265				str.SetReadDeadline(deadline1)
266				go func() {
267					defer GinkgoRecover()
268					time.Sleep(scaleDuration(20 * time.Millisecond))
269					str.SetReadDeadline(deadline2)
270					// make sure that this was actually execute before the deadline expires
271					Expect(time.Now()).To(BeTemporally("<", deadline1))
272				}()
273				runtime.Gosched()
274				b := make([]byte, 10)
275				n, err := strWithTimeout.Read(b)
276				Expect(err).To(MatchError(errDeadline))
277				Expect(n).To(BeZero())
278				Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond)))
279			})
280
281			It("unblocks earlier, when a new deadline is set", func() {
282				deadline1 := time.Now().Add(scaleDuration(200 * time.Millisecond))
283				deadline2 := time.Now().Add(scaleDuration(50 * time.Millisecond))
284				go func() {
285					defer GinkgoRecover()
286					time.Sleep(scaleDuration(10 * time.Millisecond))
287					str.SetReadDeadline(deadline2)
288					// make sure that this was actually execute before the deadline expires
289					Expect(time.Now()).To(BeTemporally("<", deadline2))
290				}()
291				str.SetReadDeadline(deadline1)
292				runtime.Gosched()
293				b := make([]byte, 10)
294				_, err := strWithTimeout.Read(b)
295				Expect(err).To(MatchError(errDeadline))
296				Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(25*time.Millisecond)))
297			})
298
299			It("doesn't unblock if the deadline is removed", func() {
300				deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
301				str.SetReadDeadline(deadline)
302				deadlineUnset := make(chan struct{})
303				go func() {
304					defer GinkgoRecover()
305					time.Sleep(scaleDuration(20 * time.Millisecond))
306					str.SetReadDeadline(time.Time{})
307					// make sure that this was actually execute before the deadline expires
308					Expect(time.Now()).To(BeTemporally("<", deadline))
309					close(deadlineUnset)
310				}()
311				done := make(chan struct{})
312				go func() {
313					defer GinkgoRecover()
314					_, err := strWithTimeout.Read(make([]byte, 1))
315					Expect(err).To(MatchError("test done"))
316					close(done)
317				}()
318				runtime.Gosched()
319				Eventually(deadlineUnset).Should(BeClosed())
320				Consistently(done, scaleDuration(100*time.Millisecond)).ShouldNot(BeClosed())
321				// make the go routine return
322				str.closeForShutdown(errors.New("test done"))
323				Eventually(done).Should(BeClosed())
324			})
325		})
326
327		Context("closing", func() {
328			Context("with FIN bit", func() {
329				It("returns EOFs", func() {
330					mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
331					mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
332					str.handleStreamFrame(&wire.StreamFrame{
333						Offset: 0,
334						Data:   []byte{0xDE, 0xAD, 0xBE, 0xEF},
335						Fin:    true,
336					})
337					mockSender.EXPECT().onStreamCompleted(streamID)
338					b := make([]byte, 4)
339					n, err := strWithTimeout.Read(b)
340					Expect(err).To(MatchError(io.EOF))
341					Expect(n).To(Equal(4))
342					Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
343					n, err = strWithTimeout.Read(b)
344					Expect(n).To(BeZero())
345					Expect(err).To(MatchError(io.EOF))
346				})
347
348				It("handles out-of-order frames", func() {
349					mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
350					mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
351					mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
352					frame1 := wire.StreamFrame{
353						Offset: 2,
354						Data:   []byte{0xBE, 0xEF},
355						Fin:    true,
356					}
357					frame2 := wire.StreamFrame{
358						Offset: 0,
359						Data:   []byte{0xDE, 0xAD},
360					}
361					err := str.handleStreamFrame(&frame1)
362					Expect(err).ToNot(HaveOccurred())
363					err = str.handleStreamFrame(&frame2)
364					Expect(err).ToNot(HaveOccurred())
365					mockSender.EXPECT().onStreamCompleted(streamID)
366					b := make([]byte, 4)
367					n, err := strWithTimeout.Read(b)
368					Expect(err).To(MatchError(io.EOF))
369					Expect(n).To(Equal(4))
370					Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
371					n, err = strWithTimeout.Read(b)
372					Expect(n).To(BeZero())
373					Expect(err).To(MatchError(io.EOF))
374				})
375
376				It("returns EOFs with partial read", func() {
377					mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), true)
378					mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
379					err := str.handleStreamFrame(&wire.StreamFrame{
380						Offset: 0,
381						Data:   []byte{0xde, 0xad},
382						Fin:    true,
383					})
384					Expect(err).ToNot(HaveOccurred())
385					mockSender.EXPECT().onStreamCompleted(streamID)
386					b := make([]byte, 4)
387					n, err := strWithTimeout.Read(b)
388					Expect(err).To(MatchError(io.EOF))
389					Expect(n).To(Equal(2))
390					Expect(b[:n]).To(Equal([]byte{0xde, 0xad}))
391				})
392
393				It("handles immediate FINs", func() {
394					mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
395					mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
396					err := str.handleStreamFrame(&wire.StreamFrame{
397						Offset: 0,
398						Fin:    true,
399					})
400					Expect(err).ToNot(HaveOccurred())
401					mockSender.EXPECT().onStreamCompleted(streamID)
402					b := make([]byte, 4)
403					n, err := strWithTimeout.Read(b)
404					Expect(n).To(BeZero())
405					Expect(err).To(MatchError(io.EOF))
406				})
407			})
408
409			It("closes when CloseRemote is called", func() {
410				mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
411				mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
412				str.CloseRemote(0)
413				mockSender.EXPECT().onStreamCompleted(streamID)
414				b := make([]byte, 8)
415				n, err := strWithTimeout.Read(b)
416				Expect(n).To(BeZero())
417				Expect(err).To(MatchError(io.EOF))
418			})
419		})
420
421		Context("closing for shutdown", func() {
422			testErr := errors.New("test error")
423
424			It("immediately returns all reads", func() {
425				done := make(chan struct{})
426				b := make([]byte, 4)
427				go func() {
428					defer GinkgoRecover()
429					n, err := strWithTimeout.Read(b)
430					Expect(n).To(BeZero())
431					Expect(err).To(MatchError(testErr))
432					close(done)
433				}()
434				Consistently(done).ShouldNot(BeClosed())
435				str.closeForShutdown(testErr)
436				Eventually(done).Should(BeClosed())
437			})
438
439			It("errors for all following reads", func() {
440				str.closeForShutdown(testErr)
441				b := make([]byte, 1)
442				n, err := strWithTimeout.Read(b)
443				Expect(n).To(BeZero())
444				Expect(err).To(MatchError(testErr))
445			})
446		})
447	})
448
449	Context("stream cancelations", func() {
450		Context("canceling read", func() {
451			It("unblocks Read", func() {
452				mockSender.EXPECT().queueControlFrame(gomock.Any())
453				done := make(chan struct{})
454				go func() {
455					defer GinkgoRecover()
456					_, err := strWithTimeout.Read([]byte{0})
457					Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
458					close(done)
459				}()
460				Consistently(done).ShouldNot(BeClosed())
461				str.CancelRead(1234)
462				Eventually(done).Should(BeClosed())
463			})
464
465			It("doesn't allow further calls to Read", func() {
466				mockSender.EXPECT().queueControlFrame(gomock.Any())
467				str.CancelRead(1234)
468				_, err := strWithTimeout.Read([]byte{0})
469				Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
470			})
471
472			It("does nothing when CancelRead is called twice", func() {
473				mockSender.EXPECT().queueControlFrame(gomock.Any())
474				str.CancelRead(1234)
475				str.CancelRead(1234)
476				_, err := strWithTimeout.Read([]byte{0})
477				Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
478			})
479
480			It("queues a STOP_SENDING frame", func() {
481				mockSender.EXPECT().queueControlFrame(&wire.StopSendingFrame{
482					StreamID:  streamID,
483					ErrorCode: 1234,
484				})
485				str.CancelRead(1234)
486			})
487
488			It("doesn't send a STOP_SENDING frame, if the FIN was already read", func() {
489				mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), true)
490				mockFC.EXPECT().AddBytesRead(protocol.ByteCount(6))
491				// no calls to mockSender.queueControlFrame
492				Expect(str.handleStreamFrame(&wire.StreamFrame{
493					StreamID: streamID,
494					Data:     []byte("foobar"),
495					Fin:      true,
496				})).To(Succeed())
497				mockSender.EXPECT().onStreamCompleted(streamID)
498				_, err := strWithTimeout.Read(make([]byte, 100))
499				Expect(err).To(MatchError(io.EOF))
500				str.CancelRead(1234)
501			})
502
503			It("doesn't send a STOP_SENDING frame, if the stream was already reset", func() {
504				gomock.InOrder(
505					mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
506					mockFC.EXPECT().Abandon(),
507				)
508				mockSender.EXPECT().onStreamCompleted(streamID)
509				Expect(str.handleResetStreamFrame(&wire.ResetStreamFrame{
510					StreamID:  streamID,
511					FinalSize: 42,
512				})).To(Succeed())
513				str.CancelRead(1234)
514			})
515
516			It("sends a STOP_SENDING and completes the stream after receiving the final offset", func() {
517				mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true)
518				Expect(str.handleStreamFrame(&wire.StreamFrame{
519					Offset: 1000,
520					Fin:    true,
521				})).To(Succeed())
522				mockFC.EXPECT().Abandon()
523				mockSender.EXPECT().queueControlFrame(gomock.Any())
524				mockSender.EXPECT().onStreamCompleted(streamID)
525				str.CancelRead(1234)
526			})
527
528			It("completes the stream when receiving the Fin after the stream was canceled", func() {
529				mockSender.EXPECT().queueControlFrame(gomock.Any())
530				str.CancelRead(1234)
531				gomock.InOrder(
532					mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true),
533					mockFC.EXPECT().Abandon(),
534				)
535				mockSender.EXPECT().onStreamCompleted(streamID)
536				Expect(str.handleStreamFrame(&wire.StreamFrame{
537					Offset: 1000,
538					Fin:    true,
539				})).To(Succeed())
540			})
541
542			It("handles duplicate FinBits after the stream was canceled", func() {
543				mockSender.EXPECT().queueControlFrame(gomock.Any())
544				str.CancelRead(1234)
545				gomock.InOrder(
546					mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true),
547					mockFC.EXPECT().Abandon(),
548					mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true),
549				)
550				mockSender.EXPECT().onStreamCompleted(streamID)
551				Expect(str.handleStreamFrame(&wire.StreamFrame{
552					Offset: 1000,
553					Fin:    true,
554				})).To(Succeed())
555				Expect(str.handleStreamFrame(&wire.StreamFrame{
556					Offset: 1000,
557					Fin:    true,
558				})).To(Succeed())
559			})
560		})
561
562		Context("receiving RESET_STREAM frames", func() {
563			rst := &wire.ResetStreamFrame{
564				StreamID:  streamID,
565				FinalSize: 42,
566				ErrorCode: 1234,
567			}
568
569			It("unblocks Read", func() {
570				done := make(chan struct{})
571				go func() {
572					defer GinkgoRecover()
573					_, err := strWithTimeout.Read([]byte{0})
574					Expect(err).To(MatchError(&StreamError{
575						StreamID:  streamID,
576						ErrorCode: 1234,
577					}))
578					close(done)
579				}()
580				Consistently(done).ShouldNot(BeClosed())
581				mockSender.EXPECT().onStreamCompleted(streamID)
582				gomock.InOrder(
583					mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
584					mockFC.EXPECT().Abandon(),
585				)
586				Expect(str.handleResetStreamFrame(rst)).To(Succeed())
587				Eventually(done).Should(BeClosed())
588			})
589
590			It("doesn't allow further calls to Read", func() {
591				mockSender.EXPECT().onStreamCompleted(streamID)
592				gomock.InOrder(
593					mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
594					mockFC.EXPECT().Abandon(),
595				)
596				Expect(str.handleResetStreamFrame(rst)).To(Succeed())
597				_, err := strWithTimeout.Read([]byte{0})
598				Expect(err).To(MatchError(&StreamError{
599					StreamID:  streamID,
600					ErrorCode: 1234,
601				}))
602			})
603
604			It("errors when receiving a RESET_STREAM with an inconsistent offset", func() {
605				testErr := errors.New("already received a different final offset before")
606				mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Return(testErr)
607				err := str.handleResetStreamFrame(rst)
608				Expect(err).To(MatchError(testErr))
609			})
610
611			It("ignores duplicate RESET_STREAM frames", func() {
612				mockSender.EXPECT().onStreamCompleted(streamID)
613				mockFC.EXPECT().Abandon()
614				mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Times(2)
615				Expect(str.handleResetStreamFrame(rst)).To(Succeed())
616				Expect(str.handleResetStreamFrame(rst)).To(Succeed())
617			})
618
619			It("doesn't call onStreamCompleted again when the final offset was already received via Fin", func() {
620				mockSender.EXPECT().queueControlFrame(gomock.Any())
621				str.CancelRead(1234)
622				mockSender.EXPECT().onStreamCompleted(streamID)
623				mockFC.EXPECT().Abandon()
624				mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Times(2)
625				Expect(str.handleStreamFrame(&wire.StreamFrame{
626					StreamID: streamID,
627					Offset:   rst.FinalSize,
628					Fin:      true,
629				})).To(Succeed())
630				Expect(str.handleResetStreamFrame(rst)).To(Succeed())
631			})
632
633			It("doesn't do anyting when it was closed for shutdown", func() {
634				str.closeForShutdown(nil)
635				err := str.handleResetStreamFrame(rst)
636				Expect(err).ToNot(HaveOccurred())
637			})
638		})
639	})
640
641	Context("flow control", func() {
642		It("errors when a STREAM frame causes a flow control violation", func() {
643			testErr := errors.New("flow control violation")
644			mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(8), false).Return(testErr)
645			frame := wire.StreamFrame{
646				Offset: 2,
647				Data:   []byte("foobar"),
648			}
649			err := str.handleStreamFrame(&frame)
650			Expect(err).To(MatchError(testErr))
651		})
652
653		It("gets a window update", func() {
654			mockFC.EXPECT().GetWindowUpdate().Return(protocol.ByteCount(0x100))
655			Expect(str.getWindowUpdate()).To(Equal(protocol.ByteCount(0x100)))
656		})
657	})
658})
659