1package quic
2
3import (
4	"bytes"
5	"errors"
6	"io"
7	"runtime"
8	"time"
9
10	"github.com/golang/mock/gomock"
11	"github.com/lucas-clemente/quic-go/internal/mocks"
12	"github.com/lucas-clemente/quic-go/internal/protocol"
13	"github.com/lucas-clemente/quic-go/internal/wire"
14
15	. "github.com/onsi/ginkgo"
16	. "github.com/onsi/gomega"
17	"github.com/onsi/gomega/gbytes"
18)
19
20var _ = Describe("Send Stream", func() {
21	const streamID protocol.StreamID = 1337
22
23	var (
24		str            *sendStream
25		strWithTimeout io.Writer // str wrapped with gbytes.TimeoutWriter
26		mockFC         *mocks.MockStreamFlowController
27		mockSender     *MockStreamSender
28	)
29
30	BeforeEach(func() {
31		mockSender = NewMockStreamSender(mockCtrl)
32		mockFC = mocks.NewMockStreamFlowController(mockCtrl)
33		str = newSendStream(streamID, mockSender, mockFC, protocol.VersionWhatever)
34
35		timeout := scaleDuration(250 * time.Millisecond)
36		strWithTimeout = gbytes.TimeoutWriter(str, timeout)
37	})
38
39	waitForWrite := func() {
40		EventuallyWithOffset(0, func() []byte {
41			str.mutex.Lock()
42			data := str.dataForWriting
43			str.mutex.Unlock()
44			return data
45		}).ShouldNot(BeEmpty())
46	}
47
48	It("gets stream id", func() {
49		Expect(str.StreamID()).To(Equal(protocol.StreamID(1337)))
50	})
51
52	Context("writing", func() {
53		It("writes and gets all data at once", func() {
54			mockSender.EXPECT().onHasStreamData(streamID)
55			mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
56			mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6))
57			done := make(chan struct{})
58			go func() {
59				defer GinkgoRecover()
60				n, err := strWithTimeout.Write([]byte("foobar"))
61				Expect(err).ToNot(HaveOccurred())
62				Expect(n).To(Equal(6))
63				close(done)
64			}()
65			waitForWrite()
66			f, _ := str.popStreamFrame(1000)
67			Expect(f.Data).To(Equal([]byte("foobar")))
68			Expect(f.FinBit).To(BeFalse())
69			Expect(f.Offset).To(BeZero())
70			Expect(f.DataLenPresent).To(BeTrue())
71			Expect(str.writeOffset).To(Equal(protocol.ByteCount(6)))
72			Expect(str.dataForWriting).To(BeNil())
73			Eventually(done).Should(BeClosed())
74		})
75
76		It("writes and gets data in two turns", func() {
77			mockSender.EXPECT().onHasStreamData(streamID)
78			frameHeaderLen := protocol.ByteCount(4)
79			mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
80			mockFC.EXPECT().AddBytesSent(gomock.Any() /* protocol.ByteCount(3)*/).Times(2)
81			done := make(chan struct{})
82			go func() {
83				defer GinkgoRecover()
84				n, err := strWithTimeout.Write([]byte("foobar"))
85				Expect(err).ToNot(HaveOccurred())
86				Expect(n).To(Equal(6))
87				close(done)
88			}()
89			waitForWrite()
90			f, _ := str.popStreamFrame(3 + frameHeaderLen)
91			Expect(f.Data).To(Equal([]byte("foo")))
92			Expect(f.FinBit).To(BeFalse())
93			Expect(f.Offset).To(BeZero())
94			Expect(f.DataLenPresent).To(BeTrue())
95			f, _ = str.popStreamFrame(100)
96			Expect(f.Data).To(Equal([]byte("bar")))
97			Expect(f.FinBit).To(BeFalse())
98			Expect(f.Offset).To(Equal(protocol.ByteCount(3)))
99			Expect(f.DataLenPresent).To(BeTrue())
100			Expect(str.popStreamFrame(1000)).To(BeNil())
101			Eventually(done).Should(BeClosed())
102		})
103
104		It("popStreamFrame returns nil if no data is available", func() {
105			frame, hasMoreData := str.popStreamFrame(1000)
106			Expect(frame).To(BeNil())
107			Expect(hasMoreData).To(BeFalse())
108		})
109
110		It("says if it has more data for writing", func() {
111			mockSender.EXPECT().onHasStreamData(streamID)
112			mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
113			mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
114			done := make(chan struct{})
115			go func() {
116				defer GinkgoRecover()
117				n, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 100))
118				Expect(err).ToNot(HaveOccurred())
119				Expect(n).To(Equal(100))
120				close(done)
121			}()
122			waitForWrite()
123			frame, hasMoreData := str.popStreamFrame(50)
124			Expect(frame).ToNot(BeNil())
125			Expect(hasMoreData).To(BeTrue())
126			frame, hasMoreData = str.popStreamFrame(1000)
127			Expect(frame).ToNot(BeNil())
128			Expect(hasMoreData).To(BeFalse())
129			frame, _ = str.popStreamFrame(1000)
130			Expect(frame).To(BeNil())
131			Eventually(done).Should(BeClosed())
132		})
133
134		It("copies the slice while writing", func() {
135			mockSender.EXPECT().onHasStreamData(streamID)
136			frameHeaderSize := protocol.ByteCount(4)
137			mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
138			mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1))
139			mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2))
140			s := []byte("foo")
141			done := make(chan struct{})
142			go func() {
143				defer GinkgoRecover()
144				n, err := strWithTimeout.Write(s)
145				Expect(err).ToNot(HaveOccurred())
146				Expect(n).To(Equal(3))
147				close(done)
148			}()
149			waitForWrite()
150			frame, _ := str.popStreamFrame(frameHeaderSize + 1)
151			Expect(frame.Data).To(Equal([]byte("f")))
152			f, _ := str.popStreamFrame(100)
153			Expect(f).ToNot(BeNil())
154			Expect(f.Data).To(Equal([]byte("oo")))
155			s[1] = 'e'
156			Expect(f.Data).To(Equal([]byte("oo")))
157			Eventually(done).Should(BeClosed())
158		})
159
160		It("returns when given a nil input", func() {
161			n, err := strWithTimeout.Write(nil)
162			Expect(n).To(BeZero())
163			Expect(err).ToNot(HaveOccurred())
164		})
165
166		It("returns when given an empty slice", func() {
167			n, err := strWithTimeout.Write([]byte(""))
168			Expect(n).To(BeZero())
169			Expect(err).ToNot(HaveOccurred())
170		})
171
172		It("cancels the context when Close is called", func() {
173			mockSender.EXPECT().onHasStreamData(streamID)
174			Expect(str.Context().Done()).ToNot(BeClosed())
175			str.Close()
176			Expect(str.Context().Done()).To(BeClosed())
177		})
178
179		Context("flow control blocking", func() {
180			It("queues a BLOCKED frame if the stream is flow control blocked", func() {
181				mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(0))
182				mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(12))
183				mockSender.EXPECT().queueControlFrame(&wire.StreamDataBlockedFrame{
184					StreamID:  streamID,
185					DataLimit: 12,
186				})
187				mockSender.EXPECT().onHasStreamData(streamID)
188				done := make(chan struct{})
189				go func() {
190					defer GinkgoRecover()
191					_, err := str.Write([]byte("foobar"))
192					Expect(err).ToNot(HaveOccurred())
193					close(done)
194				}()
195				waitForWrite()
196				f, hasMoreData := str.popStreamFrame(1000)
197				Expect(f).To(BeNil())
198				Expect(hasMoreData).To(BeFalse())
199				// make the Write go routine return
200				str.closeForShutdown(nil)
201				Eventually(done).Should(BeClosed())
202			})
203
204			It("says that it doesn't have any more data, when it is flow control blocked", func() {
205				frameHeaderSize := protocol.ByteCount(4)
206				mockSender.EXPECT().onHasStreamData(streamID)
207
208				done := make(chan struct{})
209				go func() {
210					defer GinkgoRecover()
211					_, err := str.Write([]byte("foobar"))
212					Expect(err).ToNot(HaveOccurred())
213					close(done)
214				}()
215				waitForWrite()
216
217				// first pop a STREAM frame of the maximum size allowed by flow control
218				mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(3))
219				mockFC.EXPECT().AddBytesSent(protocol.ByteCount(3))
220				f, hasMoreData := str.popStreamFrame(frameHeaderSize + 3)
221				Expect(f).ToNot(BeNil())
222				Expect(hasMoreData).To(BeTrue())
223
224				// try to pop again, this time noticing that we're blocked
225				mockFC.EXPECT().SendWindowSize()
226				// don't use offset 3 here, to make sure the BLOCKED frame contains the number returned by the flow controller
227				mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(10))
228				mockSender.EXPECT().queueControlFrame(&wire.StreamDataBlockedFrame{
229					StreamID:  streamID,
230					DataLimit: 10,
231				})
232				f, hasMoreData = str.popStreamFrame(1000)
233				Expect(f).To(BeNil())
234				Expect(hasMoreData).To(BeFalse())
235				// make the Write go routine return
236				str.closeForShutdown(nil)
237				Eventually(done).Should(BeClosed())
238			})
239		})
240
241		Context("deadlines", func() {
242			It("returns an error when Write is called after the deadline", func() {
243				str.SetWriteDeadline(time.Now().Add(-time.Second))
244				n, err := strWithTimeout.Write([]byte("foobar"))
245				Expect(err).To(MatchError(errDeadline))
246				Expect(n).To(BeZero())
247			})
248
249			It("unblocks after the deadline", func() {
250				mockSender.EXPECT().onHasStreamData(streamID)
251				deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
252				str.SetWriteDeadline(deadline)
253				n, err := strWithTimeout.Write([]byte("foobar"))
254				Expect(err).To(MatchError(errDeadline))
255				Expect(n).To(BeZero())
256				Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond)))
257			})
258
259			It("unblocks when the deadline is changed to the past", func() {
260				mockSender.EXPECT().onHasStreamData(streamID)
261				str.SetWriteDeadline(time.Now().Add(time.Hour))
262				done := make(chan struct{})
263				go func() {
264					defer GinkgoRecover()
265					_, err := str.Write([]byte("foobar"))
266					Expect(err).To(MatchError(errDeadline))
267					close(done)
268				}()
269				Consistently(done).ShouldNot(BeClosed())
270				str.SetWriteDeadline(time.Now().Add(-time.Hour))
271				Eventually(done).Should(BeClosed())
272			})
273
274			It("returns the number of bytes written, when the deadline expires", func() {
275				mockSender.EXPECT().onHasStreamData(streamID)
276				mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()
277				mockFC.EXPECT().AddBytesSent(gomock.Any())
278				deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
279				str.SetWriteDeadline(deadline)
280				var n int
281				writeReturned := make(chan struct{})
282				go func() {
283					defer GinkgoRecover()
284					var err error
285					n, err = strWithTimeout.Write(bytes.Repeat([]byte{0}, 100))
286					Expect(err).To(MatchError(errDeadline))
287					Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond)))
288					close(writeReturned)
289				}()
290				waitForWrite()
291				frame, hasMoreData := str.popStreamFrame(50)
292				Expect(frame).ToNot(BeNil())
293				Expect(hasMoreData).To(BeTrue())
294				Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed())
295				Expect(n).To(BeEquivalentTo(frame.DataLen()))
296			})
297
298			It("doesn't pop any data after the deadline expired", func() {
299				mockSender.EXPECT().onHasStreamData(streamID)
300				mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes()
301				mockFC.EXPECT().AddBytesSent(gomock.Any())
302				deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
303				str.SetWriteDeadline(deadline)
304				writeReturned := make(chan struct{})
305				go func() {
306					defer GinkgoRecover()
307					_, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 100))
308					Expect(err).To(MatchError(errDeadline))
309					close(writeReturned)
310				}()
311				waitForWrite()
312				frame, hasMoreData := str.popStreamFrame(50)
313				Expect(frame).ToNot(BeNil())
314				Expect(hasMoreData).To(BeTrue())
315				Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed())
316				frame, hasMoreData = str.popStreamFrame(50)
317				Expect(frame).To(BeNil())
318				Expect(hasMoreData).To(BeFalse())
319			})
320
321			It("doesn't unblock if the deadline is changed before the first one expires", func() {
322				mockSender.EXPECT().onHasStreamData(streamID)
323				deadline1 := time.Now().Add(scaleDuration(50 * time.Millisecond))
324				deadline2 := time.Now().Add(scaleDuration(100 * time.Millisecond))
325				str.SetWriteDeadline(deadline1)
326				done := make(chan struct{})
327				go func() {
328					defer GinkgoRecover()
329					time.Sleep(scaleDuration(20 * time.Millisecond))
330					str.SetWriteDeadline(deadline2)
331					// make sure that this was actually execute before the deadline expires
332					Expect(time.Now()).To(BeTemporally("<", deadline1))
333					close(done)
334				}()
335				runtime.Gosched()
336				n, err := strWithTimeout.Write([]byte("foobar"))
337				Expect(err).To(MatchError(errDeadline))
338				Expect(n).To(BeZero())
339				Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond)))
340				Eventually(done).Should(BeClosed())
341			})
342
343			It("unblocks earlier, when a new deadline is set", func() {
344				mockSender.EXPECT().onHasStreamData(streamID)
345				deadline1 := time.Now().Add(scaleDuration(200 * time.Millisecond))
346				deadline2 := time.Now().Add(scaleDuration(50 * time.Millisecond))
347				done := make(chan struct{})
348				go func() {
349					defer GinkgoRecover()
350					time.Sleep(scaleDuration(10 * time.Millisecond))
351					str.SetWriteDeadline(deadline2)
352					// make sure that this was actually execute before the deadline expires
353					Expect(time.Now()).To(BeTemporally("<", deadline2))
354					close(done)
355				}()
356				str.SetWriteDeadline(deadline1)
357				runtime.Gosched()
358				_, err := strWithTimeout.Write([]byte("foobar"))
359				Expect(err).To(MatchError(errDeadline))
360				Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond)))
361				Eventually(done).Should(BeClosed())
362			})
363
364			It("doesn't unblock if the deadline is removed", func() {
365				mockSender.EXPECT().onHasStreamData(streamID)
366				deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
367				str.SetWriteDeadline(deadline)
368				deadlineUnset := make(chan struct{})
369				go func() {
370					defer GinkgoRecover()
371					time.Sleep(scaleDuration(20 * time.Millisecond))
372					str.SetWriteDeadline(time.Time{})
373					// make sure that this was actually execute before the deadline expires
374					Expect(time.Now()).To(BeTemporally("<", deadline))
375					close(deadlineUnset)
376				}()
377				done := make(chan struct{})
378				go func() {
379					defer GinkgoRecover()
380					_, err := strWithTimeout.Write([]byte("foobar"))
381					Expect(err).To(MatchError("test done"))
382					close(done)
383				}()
384				runtime.Gosched()
385				Eventually(deadlineUnset).Should(BeClosed())
386				Consistently(done, scaleDuration(100*time.Millisecond)).ShouldNot(BeClosed())
387				// make the go routine return
388				str.closeForShutdown(errors.New("test done"))
389				Eventually(done).Should(BeClosed())
390			})
391		})
392
393		Context("closing", func() {
394			It("doesn't allow writes after it has been closed", func() {
395				mockSender.EXPECT().onHasStreamData(streamID)
396				str.Close()
397				_, err := strWithTimeout.Write([]byte("foobar"))
398				Expect(err).To(MatchError("write on closed stream 1337"))
399			})
400
401			It("allows FIN", func() {
402				mockSender.EXPECT().onHasStreamData(streamID)
403				mockSender.EXPECT().onStreamCompleted(streamID)
404				str.Close()
405				f, hasMoreData := str.popStreamFrame(1000)
406				Expect(f).ToNot(BeNil())
407				Expect(f.Data).To(BeEmpty())
408				Expect(f.FinBit).To(BeTrue())
409				Expect(hasMoreData).To(BeFalse())
410			})
411
412			It("doesn't send a FIN when there's still data", func() {
413				mockSender.EXPECT().onHasStreamData(streamID)
414				frameHeaderLen := protocol.ByteCount(4)
415				mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2)
416				mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2)
417				str.dataForWriting = []byte("foobar")
418				Expect(str.Close()).To(Succeed())
419				f, _ := str.popStreamFrame(3 + frameHeaderLen)
420				Expect(f).ToNot(BeNil())
421				Expect(f.Data).To(Equal([]byte("foo")))
422				Expect(f.FinBit).To(BeFalse())
423				mockSender.EXPECT().onStreamCompleted(streamID)
424				f, _ = str.popStreamFrame(100)
425				Expect(f.Data).To(Equal([]byte("bar")))
426				Expect(f.FinBit).To(BeTrue())
427			})
428
429			It("doesn't allow FIN after it is closed for shutdown", func() {
430				str.closeForShutdown(errors.New("test"))
431				f, hasMoreData := str.popStreamFrame(1000)
432				Expect(f).To(BeNil())
433				Expect(hasMoreData).To(BeFalse())
434			})
435
436			It("doesn't allow FIN twice", func() {
437				mockSender.EXPECT().onHasStreamData(streamID)
438				mockSender.EXPECT().onStreamCompleted(streamID)
439				str.Close()
440				f, _ := str.popStreamFrame(1000)
441				Expect(f).ToNot(BeNil())
442				Expect(f.Data).To(BeEmpty())
443				Expect(f.FinBit).To(BeTrue())
444				f, hasMoreData := str.popStreamFrame(1000)
445				Expect(f).To(BeNil())
446				Expect(hasMoreData).To(BeFalse())
447			})
448		})
449
450		Context("closing for shutdown", func() {
451			testErr := errors.New("test")
452
453			It("returns errors when the stream is cancelled", func() {
454				str.closeForShutdown(testErr)
455				n, err := strWithTimeout.Write([]byte("foo"))
456				Expect(n).To(BeZero())
457				Expect(err).To(MatchError(testErr))
458			})
459
460			It("doesn't get data for writing if an error occurred", func() {
461				mockSender.EXPECT().onHasStreamData(streamID)
462				mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999))
463				mockFC.EXPECT().AddBytesSent(gomock.Any())
464				done := make(chan struct{})
465				go func() {
466					defer GinkgoRecover()
467					_, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 500))
468					Expect(err).To(MatchError(testErr))
469					close(done)
470				}()
471				waitForWrite()
472				frame, hasMoreData := str.popStreamFrame(50) // get a STREAM frame containing some data, but not all
473				Expect(frame).ToNot(BeNil())
474				Expect(hasMoreData).To(BeTrue())
475				str.closeForShutdown(testErr)
476				frame, hasMoreData = str.popStreamFrame(1000)
477				Expect(frame).To(BeNil())
478				Expect(hasMoreData).To(BeFalse())
479				Eventually(done).Should(BeClosed())
480			})
481
482			It("cancels the context", func() {
483				Expect(str.Context().Done()).ToNot(BeClosed())
484				str.closeForShutdown(testErr)
485				Expect(str.Context().Done()).To(BeClosed())
486			})
487		})
488	})
489
490	Context("handling MAX_STREAM_DATA frames", func() {
491		It("informs the flow controller", func() {
492			mockFC.EXPECT().UpdateSendWindow(protocol.ByteCount(0x1337))
493			str.handleMaxStreamDataFrame(&wire.MaxStreamDataFrame{
494				StreamID:   streamID,
495				ByteOffset: 0x1337,
496			})
497		})
498
499		It("says when it has data for sending", func() {
500			mockFC.EXPECT().UpdateSendWindow(gomock.Any())
501			mockSender.EXPECT().onHasStreamData(streamID).Times(2) // once for Write, once for the MAX_STREAM_DATA frame
502			done := make(chan struct{})
503			go func() {
504				defer GinkgoRecover()
505				_, err := str.Write([]byte("foobar"))
506				Expect(err).ToNot(HaveOccurred())
507				close(done)
508			}()
509			waitForWrite()
510			str.handleMaxStreamDataFrame(&wire.MaxStreamDataFrame{
511				StreamID:   streamID,
512				ByteOffset: 42,
513			})
514			// make sure the Write go routine returns
515			str.closeForShutdown(nil)
516			Eventually(done).Should(BeClosed())
517		})
518	})
519
520	Context("stream cancelations", func() {
521		Context("canceling writing", func() {
522			It("queues a RESET_STREAM frame", func() {
523				mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{
524					StreamID:   streamID,
525					ByteOffset: 1234,
526					ErrorCode:  9876,
527				})
528				mockSender.EXPECT().onStreamCompleted(streamID)
529				str.writeOffset = 1234
530				str.CancelWrite(9876)
531			})
532
533			It("unblocks Write", func() {
534				mockSender.EXPECT().onHasStreamData(streamID)
535				mockSender.EXPECT().onStreamCompleted(streamID)
536				mockSender.EXPECT().queueControlFrame(gomock.Any())
537				mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
538				mockFC.EXPECT().AddBytesSent(gomock.Any())
539				writeReturned := make(chan struct{})
540				var n int
541				go func() {
542					defer GinkgoRecover()
543					var err error
544					n, err = strWithTimeout.Write(bytes.Repeat([]byte{0}, 100))
545					Expect(err).To(MatchError("Write on stream 1337 canceled with error code 1234"))
546					close(writeReturned)
547				}()
548				waitForWrite()
549				frame, _ := str.popStreamFrame(50)
550				Expect(frame).ToNot(BeNil())
551				str.CancelWrite(1234)
552				Eventually(writeReturned).Should(BeClosed())
553				Expect(n).To(BeEquivalentTo(frame.DataLen()))
554			})
555
556			It("doesn't pop STREAM frames after being canceled", func() {
557				mockSender.EXPECT().onHasStreamData(streamID)
558				mockSender.EXPECT().onStreamCompleted(streamID)
559				mockSender.EXPECT().queueControlFrame(gomock.Any())
560				mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount)
561				mockFC.EXPECT().AddBytesSent(gomock.Any())
562				writeReturned := make(chan struct{})
563				go func() {
564					defer GinkgoRecover()
565					_, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 100))
566					Expect(err).To(MatchError("Write on stream 1337 canceled with error code 1234"))
567					close(writeReturned)
568				}()
569				waitForWrite()
570				frame, hasMoreData := str.popStreamFrame(50)
571				Expect(hasMoreData).To(BeTrue())
572				Expect(frame).ToNot(BeNil())
573				str.CancelWrite(1234)
574				frame, hasMoreData = str.popStreamFrame(10)
575				Expect(hasMoreData).To(BeFalse())
576				Expect(frame).To(BeNil())
577				Eventually(writeReturned).Should(BeClosed())
578			})
579
580			It("cancels the context", func() {
581				mockSender.EXPECT().queueControlFrame(gomock.Any())
582				mockSender.EXPECT().onStreamCompleted(streamID)
583				Expect(str.Context().Done()).ToNot(BeClosed())
584				str.CancelWrite(1234)
585				Expect(str.Context().Done()).To(BeClosed())
586			})
587
588			It("doesn't allow further calls to Write", func() {
589				mockSender.EXPECT().queueControlFrame(gomock.Any())
590				mockSender.EXPECT().onStreamCompleted(streamID)
591				str.CancelWrite(1234)
592				_, err := strWithTimeout.Write([]byte("foobar"))
593				Expect(err).To(MatchError("Write on stream 1337 canceled with error code 1234"))
594			})
595
596			It("only cancels once", func() {
597				mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{StreamID: streamID, ErrorCode: 1234})
598				mockSender.EXPECT().onStreamCompleted(streamID)
599				str.CancelWrite(1234)
600				str.CancelWrite(4321)
601			})
602
603			It("doesn't do anything when the stream was already closed", func() {
604				mockSender.EXPECT().onHasStreamData(streamID)
605				Expect(str.Close()).To(Succeed())
606				// don't EXPECT any calls to queueControlFrame
607				str.CancelWrite(123)
608			})
609		})
610
611		Context("receiving STOP_SENDING frames", func() {
612			It("queues a RESET_STREAM frames with error code Stopping", func() {
613				mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{
614					StreamID:  streamID,
615					ErrorCode: errorCodeStopping,
616				})
617				mockSender.EXPECT().onStreamCompleted(streamID)
618				str.handleStopSendingFrame(&wire.StopSendingFrame{
619					StreamID:  streamID,
620					ErrorCode: 101,
621				})
622			})
623
624			It("unblocks Write", func() {
625				mockSender.EXPECT().onHasStreamData(streamID)
626				mockSender.EXPECT().queueControlFrame(gomock.Any())
627				done := make(chan struct{})
628				go func() {
629					defer GinkgoRecover()
630					_, err := str.Write([]byte("foobar"))
631					Expect(err).To(MatchError("Stream 1337 was reset with error code 123"))
632					Expect(err).To(BeAssignableToTypeOf(streamCanceledError{}))
633					Expect(err.(streamCanceledError).Canceled()).To(BeTrue())
634					Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(123)))
635					close(done)
636				}()
637				waitForWrite()
638				mockSender.EXPECT().onStreamCompleted(streamID)
639				str.handleStopSendingFrame(&wire.StopSendingFrame{
640					StreamID:  streamID,
641					ErrorCode: 123,
642				})
643				Eventually(done).Should(BeClosed())
644			})
645
646			It("doesn't allow further calls to Write", func() {
647				mockSender.EXPECT().queueControlFrame(gomock.Any())
648				mockSender.EXPECT().onStreamCompleted(streamID)
649				str.handleStopSendingFrame(&wire.StopSendingFrame{
650					StreamID:  streamID,
651					ErrorCode: 123,
652				})
653				_, err := str.Write([]byte("foobar"))
654				Expect(err).To(MatchError("Stream 1337 was reset with error code 123"))
655				Expect(err).To(BeAssignableToTypeOf(streamCanceledError{}))
656				Expect(err.(streamCanceledError).Canceled()).To(BeTrue())
657				Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(123)))
658			})
659		})
660	})
661})
662