1package quic 2 3import ( 4 "bytes" 5 "errors" 6 "io" 7 "runtime" 8 "time" 9 10 "github.com/golang/mock/gomock" 11 "github.com/lucas-clemente/quic-go/internal/mocks" 12 "github.com/lucas-clemente/quic-go/internal/protocol" 13 "github.com/lucas-clemente/quic-go/internal/wire" 14 15 . "github.com/onsi/ginkgo" 16 . "github.com/onsi/gomega" 17 "github.com/onsi/gomega/gbytes" 18) 19 20var _ = Describe("Send Stream", func() { 21 const streamID protocol.StreamID = 1337 22 23 var ( 24 str *sendStream 25 strWithTimeout io.Writer // str wrapped with gbytes.TimeoutWriter 26 mockFC *mocks.MockStreamFlowController 27 mockSender *MockStreamSender 28 ) 29 30 BeforeEach(func() { 31 mockSender = NewMockStreamSender(mockCtrl) 32 mockFC = mocks.NewMockStreamFlowController(mockCtrl) 33 str = newSendStream(streamID, mockSender, mockFC, protocol.VersionWhatever) 34 35 timeout := scaleDuration(250 * time.Millisecond) 36 strWithTimeout = gbytes.TimeoutWriter(str, timeout) 37 }) 38 39 waitForWrite := func() { 40 EventuallyWithOffset(0, func() []byte { 41 str.mutex.Lock() 42 data := str.dataForWriting 43 str.mutex.Unlock() 44 return data 45 }).ShouldNot(BeEmpty()) 46 } 47 48 It("gets stream id", func() { 49 Expect(str.StreamID()).To(Equal(protocol.StreamID(1337))) 50 }) 51 52 Context("writing", func() { 53 It("writes and gets all data at once", func() { 54 mockSender.EXPECT().onHasStreamData(streamID) 55 mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) 56 mockFC.EXPECT().AddBytesSent(protocol.ByteCount(6)) 57 done := make(chan struct{}) 58 go func() { 59 defer GinkgoRecover() 60 n, err := strWithTimeout.Write([]byte("foobar")) 61 Expect(err).ToNot(HaveOccurred()) 62 Expect(n).To(Equal(6)) 63 close(done) 64 }() 65 waitForWrite() 66 f, _ := str.popStreamFrame(1000) 67 Expect(f.Data).To(Equal([]byte("foobar"))) 68 Expect(f.FinBit).To(BeFalse()) 69 Expect(f.Offset).To(BeZero()) 70 Expect(f.DataLenPresent).To(BeTrue()) 71 Expect(str.writeOffset).To(Equal(protocol.ByteCount(6))) 72 Expect(str.dataForWriting).To(BeNil()) 73 Eventually(done).Should(BeClosed()) 74 }) 75 76 It("writes and gets data in two turns", func() { 77 mockSender.EXPECT().onHasStreamData(streamID) 78 frameHeaderLen := protocol.ByteCount(4) 79 mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) 80 mockFC.EXPECT().AddBytesSent(gomock.Any() /* protocol.ByteCount(3)*/).Times(2) 81 done := make(chan struct{}) 82 go func() { 83 defer GinkgoRecover() 84 n, err := strWithTimeout.Write([]byte("foobar")) 85 Expect(err).ToNot(HaveOccurred()) 86 Expect(n).To(Equal(6)) 87 close(done) 88 }() 89 waitForWrite() 90 f, _ := str.popStreamFrame(3 + frameHeaderLen) 91 Expect(f.Data).To(Equal([]byte("foo"))) 92 Expect(f.FinBit).To(BeFalse()) 93 Expect(f.Offset).To(BeZero()) 94 Expect(f.DataLenPresent).To(BeTrue()) 95 f, _ = str.popStreamFrame(100) 96 Expect(f.Data).To(Equal([]byte("bar"))) 97 Expect(f.FinBit).To(BeFalse()) 98 Expect(f.Offset).To(Equal(protocol.ByteCount(3))) 99 Expect(f.DataLenPresent).To(BeTrue()) 100 Expect(str.popStreamFrame(1000)).To(BeNil()) 101 Eventually(done).Should(BeClosed()) 102 }) 103 104 It("popStreamFrame returns nil if no data is available", func() { 105 frame, hasMoreData := str.popStreamFrame(1000) 106 Expect(frame).To(BeNil()) 107 Expect(hasMoreData).To(BeFalse()) 108 }) 109 110 It("says if it has more data for writing", func() { 111 mockSender.EXPECT().onHasStreamData(streamID) 112 mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) 113 mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2) 114 done := make(chan struct{}) 115 go func() { 116 defer GinkgoRecover() 117 n, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 100)) 118 Expect(err).ToNot(HaveOccurred()) 119 Expect(n).To(Equal(100)) 120 close(done) 121 }() 122 waitForWrite() 123 frame, hasMoreData := str.popStreamFrame(50) 124 Expect(frame).ToNot(BeNil()) 125 Expect(hasMoreData).To(BeTrue()) 126 frame, hasMoreData = str.popStreamFrame(1000) 127 Expect(frame).ToNot(BeNil()) 128 Expect(hasMoreData).To(BeFalse()) 129 frame, _ = str.popStreamFrame(1000) 130 Expect(frame).To(BeNil()) 131 Eventually(done).Should(BeClosed()) 132 }) 133 134 It("copies the slice while writing", func() { 135 mockSender.EXPECT().onHasStreamData(streamID) 136 frameHeaderSize := protocol.ByteCount(4) 137 mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) 138 mockFC.EXPECT().AddBytesSent(protocol.ByteCount(1)) 139 mockFC.EXPECT().AddBytesSent(protocol.ByteCount(2)) 140 s := []byte("foo") 141 done := make(chan struct{}) 142 go func() { 143 defer GinkgoRecover() 144 n, err := strWithTimeout.Write(s) 145 Expect(err).ToNot(HaveOccurred()) 146 Expect(n).To(Equal(3)) 147 close(done) 148 }() 149 waitForWrite() 150 frame, _ := str.popStreamFrame(frameHeaderSize + 1) 151 Expect(frame.Data).To(Equal([]byte("f"))) 152 f, _ := str.popStreamFrame(100) 153 Expect(f).ToNot(BeNil()) 154 Expect(f.Data).To(Equal([]byte("oo"))) 155 s[1] = 'e' 156 Expect(f.Data).To(Equal([]byte("oo"))) 157 Eventually(done).Should(BeClosed()) 158 }) 159 160 It("returns when given a nil input", func() { 161 n, err := strWithTimeout.Write(nil) 162 Expect(n).To(BeZero()) 163 Expect(err).ToNot(HaveOccurred()) 164 }) 165 166 It("returns when given an empty slice", func() { 167 n, err := strWithTimeout.Write([]byte("")) 168 Expect(n).To(BeZero()) 169 Expect(err).ToNot(HaveOccurred()) 170 }) 171 172 It("cancels the context when Close is called", func() { 173 mockSender.EXPECT().onHasStreamData(streamID) 174 Expect(str.Context().Done()).ToNot(BeClosed()) 175 str.Close() 176 Expect(str.Context().Done()).To(BeClosed()) 177 }) 178 179 Context("flow control blocking", func() { 180 It("queues a BLOCKED frame if the stream is flow control blocked", func() { 181 mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(0)) 182 mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(12)) 183 mockSender.EXPECT().queueControlFrame(&wire.StreamDataBlockedFrame{ 184 StreamID: streamID, 185 DataLimit: 12, 186 }) 187 mockSender.EXPECT().onHasStreamData(streamID) 188 done := make(chan struct{}) 189 go func() { 190 defer GinkgoRecover() 191 _, err := str.Write([]byte("foobar")) 192 Expect(err).ToNot(HaveOccurred()) 193 close(done) 194 }() 195 waitForWrite() 196 f, hasMoreData := str.popStreamFrame(1000) 197 Expect(f).To(BeNil()) 198 Expect(hasMoreData).To(BeFalse()) 199 // make the Write go routine return 200 str.closeForShutdown(nil) 201 Eventually(done).Should(BeClosed()) 202 }) 203 204 It("says that it doesn't have any more data, when it is flow control blocked", func() { 205 frameHeaderSize := protocol.ByteCount(4) 206 mockSender.EXPECT().onHasStreamData(streamID) 207 208 done := make(chan struct{}) 209 go func() { 210 defer GinkgoRecover() 211 _, err := str.Write([]byte("foobar")) 212 Expect(err).ToNot(HaveOccurred()) 213 close(done) 214 }() 215 waitForWrite() 216 217 // first pop a STREAM frame of the maximum size allowed by flow control 218 mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(3)) 219 mockFC.EXPECT().AddBytesSent(protocol.ByteCount(3)) 220 f, hasMoreData := str.popStreamFrame(frameHeaderSize + 3) 221 Expect(f).ToNot(BeNil()) 222 Expect(hasMoreData).To(BeTrue()) 223 224 // try to pop again, this time noticing that we're blocked 225 mockFC.EXPECT().SendWindowSize() 226 // don't use offset 3 here, to make sure the BLOCKED frame contains the number returned by the flow controller 227 mockFC.EXPECT().IsNewlyBlocked().Return(true, protocol.ByteCount(10)) 228 mockSender.EXPECT().queueControlFrame(&wire.StreamDataBlockedFrame{ 229 StreamID: streamID, 230 DataLimit: 10, 231 }) 232 f, hasMoreData = str.popStreamFrame(1000) 233 Expect(f).To(BeNil()) 234 Expect(hasMoreData).To(BeFalse()) 235 // make the Write go routine return 236 str.closeForShutdown(nil) 237 Eventually(done).Should(BeClosed()) 238 }) 239 }) 240 241 Context("deadlines", func() { 242 It("returns an error when Write is called after the deadline", func() { 243 str.SetWriteDeadline(time.Now().Add(-time.Second)) 244 n, err := strWithTimeout.Write([]byte("foobar")) 245 Expect(err).To(MatchError(errDeadline)) 246 Expect(n).To(BeZero()) 247 }) 248 249 It("unblocks after the deadline", func() { 250 mockSender.EXPECT().onHasStreamData(streamID) 251 deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) 252 str.SetWriteDeadline(deadline) 253 n, err := strWithTimeout.Write([]byte("foobar")) 254 Expect(err).To(MatchError(errDeadline)) 255 Expect(n).To(BeZero()) 256 Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond))) 257 }) 258 259 It("unblocks when the deadline is changed to the past", func() { 260 mockSender.EXPECT().onHasStreamData(streamID) 261 str.SetWriteDeadline(time.Now().Add(time.Hour)) 262 done := make(chan struct{}) 263 go func() { 264 defer GinkgoRecover() 265 _, err := str.Write([]byte("foobar")) 266 Expect(err).To(MatchError(errDeadline)) 267 close(done) 268 }() 269 Consistently(done).ShouldNot(BeClosed()) 270 str.SetWriteDeadline(time.Now().Add(-time.Hour)) 271 Eventually(done).Should(BeClosed()) 272 }) 273 274 It("returns the number of bytes written, when the deadline expires", func() { 275 mockSender.EXPECT().onHasStreamData(streamID) 276 mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes() 277 mockFC.EXPECT().AddBytesSent(gomock.Any()) 278 deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) 279 str.SetWriteDeadline(deadline) 280 var n int 281 writeReturned := make(chan struct{}) 282 go func() { 283 defer GinkgoRecover() 284 var err error 285 n, err = strWithTimeout.Write(bytes.Repeat([]byte{0}, 100)) 286 Expect(err).To(MatchError(errDeadline)) 287 Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond))) 288 close(writeReturned) 289 }() 290 waitForWrite() 291 frame, hasMoreData := str.popStreamFrame(50) 292 Expect(frame).ToNot(BeNil()) 293 Expect(hasMoreData).To(BeTrue()) 294 Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed()) 295 Expect(n).To(BeEquivalentTo(frame.DataLen())) 296 }) 297 298 It("doesn't pop any data after the deadline expired", func() { 299 mockSender.EXPECT().onHasStreamData(streamID) 300 mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(10000)).AnyTimes() 301 mockFC.EXPECT().AddBytesSent(gomock.Any()) 302 deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) 303 str.SetWriteDeadline(deadline) 304 writeReturned := make(chan struct{}) 305 go func() { 306 defer GinkgoRecover() 307 _, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 100)) 308 Expect(err).To(MatchError(errDeadline)) 309 close(writeReturned) 310 }() 311 waitForWrite() 312 frame, hasMoreData := str.popStreamFrame(50) 313 Expect(frame).ToNot(BeNil()) 314 Expect(hasMoreData).To(BeTrue()) 315 Eventually(writeReturned, scaleDuration(80*time.Millisecond)).Should(BeClosed()) 316 frame, hasMoreData = str.popStreamFrame(50) 317 Expect(frame).To(BeNil()) 318 Expect(hasMoreData).To(BeFalse()) 319 }) 320 321 It("doesn't unblock if the deadline is changed before the first one expires", func() { 322 mockSender.EXPECT().onHasStreamData(streamID) 323 deadline1 := time.Now().Add(scaleDuration(50 * time.Millisecond)) 324 deadline2 := time.Now().Add(scaleDuration(100 * time.Millisecond)) 325 str.SetWriteDeadline(deadline1) 326 done := make(chan struct{}) 327 go func() { 328 defer GinkgoRecover() 329 time.Sleep(scaleDuration(20 * time.Millisecond)) 330 str.SetWriteDeadline(deadline2) 331 // make sure that this was actually execute before the deadline expires 332 Expect(time.Now()).To(BeTemporally("<", deadline1)) 333 close(done) 334 }() 335 runtime.Gosched() 336 n, err := strWithTimeout.Write([]byte("foobar")) 337 Expect(err).To(MatchError(errDeadline)) 338 Expect(n).To(BeZero()) 339 Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond))) 340 Eventually(done).Should(BeClosed()) 341 }) 342 343 It("unblocks earlier, when a new deadline is set", func() { 344 mockSender.EXPECT().onHasStreamData(streamID) 345 deadline1 := time.Now().Add(scaleDuration(200 * time.Millisecond)) 346 deadline2 := time.Now().Add(scaleDuration(50 * time.Millisecond)) 347 done := make(chan struct{}) 348 go func() { 349 defer GinkgoRecover() 350 time.Sleep(scaleDuration(10 * time.Millisecond)) 351 str.SetWriteDeadline(deadline2) 352 // make sure that this was actually execute before the deadline expires 353 Expect(time.Now()).To(BeTemporally("<", deadline2)) 354 close(done) 355 }() 356 str.SetWriteDeadline(deadline1) 357 runtime.Gosched() 358 _, err := strWithTimeout.Write([]byte("foobar")) 359 Expect(err).To(MatchError(errDeadline)) 360 Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond))) 361 Eventually(done).Should(BeClosed()) 362 }) 363 364 It("doesn't unblock if the deadline is removed", func() { 365 mockSender.EXPECT().onHasStreamData(streamID) 366 deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) 367 str.SetWriteDeadline(deadline) 368 deadlineUnset := make(chan struct{}) 369 go func() { 370 defer GinkgoRecover() 371 time.Sleep(scaleDuration(20 * time.Millisecond)) 372 str.SetWriteDeadline(time.Time{}) 373 // make sure that this was actually execute before the deadline expires 374 Expect(time.Now()).To(BeTemporally("<", deadline)) 375 close(deadlineUnset) 376 }() 377 done := make(chan struct{}) 378 go func() { 379 defer GinkgoRecover() 380 _, err := strWithTimeout.Write([]byte("foobar")) 381 Expect(err).To(MatchError("test done")) 382 close(done) 383 }() 384 runtime.Gosched() 385 Eventually(deadlineUnset).Should(BeClosed()) 386 Consistently(done, scaleDuration(100*time.Millisecond)).ShouldNot(BeClosed()) 387 // make the go routine return 388 str.closeForShutdown(errors.New("test done")) 389 Eventually(done).Should(BeClosed()) 390 }) 391 }) 392 393 Context("closing", func() { 394 It("doesn't allow writes after it has been closed", func() { 395 mockSender.EXPECT().onHasStreamData(streamID) 396 str.Close() 397 _, err := strWithTimeout.Write([]byte("foobar")) 398 Expect(err).To(MatchError("write on closed stream 1337")) 399 }) 400 401 It("allows FIN", func() { 402 mockSender.EXPECT().onHasStreamData(streamID) 403 mockSender.EXPECT().onStreamCompleted(streamID) 404 str.Close() 405 f, hasMoreData := str.popStreamFrame(1000) 406 Expect(f).ToNot(BeNil()) 407 Expect(f.Data).To(BeEmpty()) 408 Expect(f.FinBit).To(BeTrue()) 409 Expect(hasMoreData).To(BeFalse()) 410 }) 411 412 It("doesn't send a FIN when there's still data", func() { 413 mockSender.EXPECT().onHasStreamData(streamID) 414 frameHeaderLen := protocol.ByteCount(4) 415 mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)).Times(2) 416 mockFC.EXPECT().AddBytesSent(gomock.Any()).Times(2) 417 str.dataForWriting = []byte("foobar") 418 Expect(str.Close()).To(Succeed()) 419 f, _ := str.popStreamFrame(3 + frameHeaderLen) 420 Expect(f).ToNot(BeNil()) 421 Expect(f.Data).To(Equal([]byte("foo"))) 422 Expect(f.FinBit).To(BeFalse()) 423 mockSender.EXPECT().onStreamCompleted(streamID) 424 f, _ = str.popStreamFrame(100) 425 Expect(f.Data).To(Equal([]byte("bar"))) 426 Expect(f.FinBit).To(BeTrue()) 427 }) 428 429 It("doesn't allow FIN after it is closed for shutdown", func() { 430 str.closeForShutdown(errors.New("test")) 431 f, hasMoreData := str.popStreamFrame(1000) 432 Expect(f).To(BeNil()) 433 Expect(hasMoreData).To(BeFalse()) 434 }) 435 436 It("doesn't allow FIN twice", func() { 437 mockSender.EXPECT().onHasStreamData(streamID) 438 mockSender.EXPECT().onStreamCompleted(streamID) 439 str.Close() 440 f, _ := str.popStreamFrame(1000) 441 Expect(f).ToNot(BeNil()) 442 Expect(f.Data).To(BeEmpty()) 443 Expect(f.FinBit).To(BeTrue()) 444 f, hasMoreData := str.popStreamFrame(1000) 445 Expect(f).To(BeNil()) 446 Expect(hasMoreData).To(BeFalse()) 447 }) 448 }) 449 450 Context("closing for shutdown", func() { 451 testErr := errors.New("test") 452 453 It("returns errors when the stream is cancelled", func() { 454 str.closeForShutdown(testErr) 455 n, err := strWithTimeout.Write([]byte("foo")) 456 Expect(n).To(BeZero()) 457 Expect(err).To(MatchError(testErr)) 458 }) 459 460 It("doesn't get data for writing if an error occurred", func() { 461 mockSender.EXPECT().onHasStreamData(streamID) 462 mockFC.EXPECT().SendWindowSize().Return(protocol.ByteCount(9999)) 463 mockFC.EXPECT().AddBytesSent(gomock.Any()) 464 done := make(chan struct{}) 465 go func() { 466 defer GinkgoRecover() 467 _, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 500)) 468 Expect(err).To(MatchError(testErr)) 469 close(done) 470 }() 471 waitForWrite() 472 frame, hasMoreData := str.popStreamFrame(50) // get a STREAM frame containing some data, but not all 473 Expect(frame).ToNot(BeNil()) 474 Expect(hasMoreData).To(BeTrue()) 475 str.closeForShutdown(testErr) 476 frame, hasMoreData = str.popStreamFrame(1000) 477 Expect(frame).To(BeNil()) 478 Expect(hasMoreData).To(BeFalse()) 479 Eventually(done).Should(BeClosed()) 480 }) 481 482 It("cancels the context", func() { 483 Expect(str.Context().Done()).ToNot(BeClosed()) 484 str.closeForShutdown(testErr) 485 Expect(str.Context().Done()).To(BeClosed()) 486 }) 487 }) 488 }) 489 490 Context("handling MAX_STREAM_DATA frames", func() { 491 It("informs the flow controller", func() { 492 mockFC.EXPECT().UpdateSendWindow(protocol.ByteCount(0x1337)) 493 str.handleMaxStreamDataFrame(&wire.MaxStreamDataFrame{ 494 StreamID: streamID, 495 ByteOffset: 0x1337, 496 }) 497 }) 498 499 It("says when it has data for sending", func() { 500 mockFC.EXPECT().UpdateSendWindow(gomock.Any()) 501 mockSender.EXPECT().onHasStreamData(streamID).Times(2) // once for Write, once for the MAX_STREAM_DATA frame 502 done := make(chan struct{}) 503 go func() { 504 defer GinkgoRecover() 505 _, err := str.Write([]byte("foobar")) 506 Expect(err).ToNot(HaveOccurred()) 507 close(done) 508 }() 509 waitForWrite() 510 str.handleMaxStreamDataFrame(&wire.MaxStreamDataFrame{ 511 StreamID: streamID, 512 ByteOffset: 42, 513 }) 514 // make sure the Write go routine returns 515 str.closeForShutdown(nil) 516 Eventually(done).Should(BeClosed()) 517 }) 518 }) 519 520 Context("stream cancelations", func() { 521 Context("canceling writing", func() { 522 It("queues a RESET_STREAM frame", func() { 523 mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{ 524 StreamID: streamID, 525 ByteOffset: 1234, 526 ErrorCode: 9876, 527 }) 528 mockSender.EXPECT().onStreamCompleted(streamID) 529 str.writeOffset = 1234 530 str.CancelWrite(9876) 531 }) 532 533 It("unblocks Write", func() { 534 mockSender.EXPECT().onHasStreamData(streamID) 535 mockSender.EXPECT().onStreamCompleted(streamID) 536 mockSender.EXPECT().queueControlFrame(gomock.Any()) 537 mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) 538 mockFC.EXPECT().AddBytesSent(gomock.Any()) 539 writeReturned := make(chan struct{}) 540 var n int 541 go func() { 542 defer GinkgoRecover() 543 var err error 544 n, err = strWithTimeout.Write(bytes.Repeat([]byte{0}, 100)) 545 Expect(err).To(MatchError("Write on stream 1337 canceled with error code 1234")) 546 close(writeReturned) 547 }() 548 waitForWrite() 549 frame, _ := str.popStreamFrame(50) 550 Expect(frame).ToNot(BeNil()) 551 str.CancelWrite(1234) 552 Eventually(writeReturned).Should(BeClosed()) 553 Expect(n).To(BeEquivalentTo(frame.DataLen())) 554 }) 555 556 It("doesn't pop STREAM frames after being canceled", func() { 557 mockSender.EXPECT().onHasStreamData(streamID) 558 mockSender.EXPECT().onStreamCompleted(streamID) 559 mockSender.EXPECT().queueControlFrame(gomock.Any()) 560 mockFC.EXPECT().SendWindowSize().Return(protocol.MaxByteCount) 561 mockFC.EXPECT().AddBytesSent(gomock.Any()) 562 writeReturned := make(chan struct{}) 563 go func() { 564 defer GinkgoRecover() 565 _, err := strWithTimeout.Write(bytes.Repeat([]byte{0}, 100)) 566 Expect(err).To(MatchError("Write on stream 1337 canceled with error code 1234")) 567 close(writeReturned) 568 }() 569 waitForWrite() 570 frame, hasMoreData := str.popStreamFrame(50) 571 Expect(hasMoreData).To(BeTrue()) 572 Expect(frame).ToNot(BeNil()) 573 str.CancelWrite(1234) 574 frame, hasMoreData = str.popStreamFrame(10) 575 Expect(hasMoreData).To(BeFalse()) 576 Expect(frame).To(BeNil()) 577 Eventually(writeReturned).Should(BeClosed()) 578 }) 579 580 It("cancels the context", func() { 581 mockSender.EXPECT().queueControlFrame(gomock.Any()) 582 mockSender.EXPECT().onStreamCompleted(streamID) 583 Expect(str.Context().Done()).ToNot(BeClosed()) 584 str.CancelWrite(1234) 585 Expect(str.Context().Done()).To(BeClosed()) 586 }) 587 588 It("doesn't allow further calls to Write", func() { 589 mockSender.EXPECT().queueControlFrame(gomock.Any()) 590 mockSender.EXPECT().onStreamCompleted(streamID) 591 str.CancelWrite(1234) 592 _, err := strWithTimeout.Write([]byte("foobar")) 593 Expect(err).To(MatchError("Write on stream 1337 canceled with error code 1234")) 594 }) 595 596 It("only cancels once", func() { 597 mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{StreamID: streamID, ErrorCode: 1234}) 598 mockSender.EXPECT().onStreamCompleted(streamID) 599 str.CancelWrite(1234) 600 str.CancelWrite(4321) 601 }) 602 603 It("doesn't do anything when the stream was already closed", func() { 604 mockSender.EXPECT().onHasStreamData(streamID) 605 Expect(str.Close()).To(Succeed()) 606 // don't EXPECT any calls to queueControlFrame 607 str.CancelWrite(123) 608 }) 609 }) 610 611 Context("receiving STOP_SENDING frames", func() { 612 It("queues a RESET_STREAM frames with error code Stopping", func() { 613 mockSender.EXPECT().queueControlFrame(&wire.ResetStreamFrame{ 614 StreamID: streamID, 615 ErrorCode: errorCodeStopping, 616 }) 617 mockSender.EXPECT().onStreamCompleted(streamID) 618 str.handleStopSendingFrame(&wire.StopSendingFrame{ 619 StreamID: streamID, 620 ErrorCode: 101, 621 }) 622 }) 623 624 It("unblocks Write", func() { 625 mockSender.EXPECT().onHasStreamData(streamID) 626 mockSender.EXPECT().queueControlFrame(gomock.Any()) 627 done := make(chan struct{}) 628 go func() { 629 defer GinkgoRecover() 630 _, err := str.Write([]byte("foobar")) 631 Expect(err).To(MatchError("Stream 1337 was reset with error code 123")) 632 Expect(err).To(BeAssignableToTypeOf(streamCanceledError{})) 633 Expect(err.(streamCanceledError).Canceled()).To(BeTrue()) 634 Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(123))) 635 close(done) 636 }() 637 waitForWrite() 638 mockSender.EXPECT().onStreamCompleted(streamID) 639 str.handleStopSendingFrame(&wire.StopSendingFrame{ 640 StreamID: streamID, 641 ErrorCode: 123, 642 }) 643 Eventually(done).Should(BeClosed()) 644 }) 645 646 It("doesn't allow further calls to Write", func() { 647 mockSender.EXPECT().queueControlFrame(gomock.Any()) 648 mockSender.EXPECT().onStreamCompleted(streamID) 649 str.handleStopSendingFrame(&wire.StopSendingFrame{ 650 StreamID: streamID, 651 ErrorCode: 123, 652 }) 653 _, err := str.Write([]byte("foobar")) 654 Expect(err).To(MatchError("Stream 1337 was reset with error code 123")) 655 Expect(err).To(BeAssignableToTypeOf(streamCanceledError{})) 656 Expect(err.(streamCanceledError).Canceled()).To(BeTrue()) 657 Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(123))) 658 }) 659 }) 660 }) 661}) 662