1package quic 2 3import ( 4 "errors" 5 "io" 6 "runtime" 7 "time" 8 9 "github.com/golang/mock/gomock" 10 "github.com/lucas-clemente/quic-go/internal/mocks" 11 "github.com/lucas-clemente/quic-go/internal/protocol" 12 "github.com/lucas-clemente/quic-go/internal/wire" 13 14 . "github.com/onsi/ginkgo" 15 . "github.com/onsi/gomega" 16 "github.com/onsi/gomega/gbytes" 17) 18 19var _ = Describe("Receive Stream", func() { 20 const streamID protocol.StreamID = 1337 21 22 var ( 23 str *receiveStream 24 strWithTimeout io.Reader // str wrapped with gbytes.TimeoutReader 25 mockFC *mocks.MockStreamFlowController 26 mockSender *MockStreamSender 27 ) 28 29 BeforeEach(func() { 30 mockSender = NewMockStreamSender(mockCtrl) 31 mockFC = mocks.NewMockStreamFlowController(mockCtrl) 32 str = newReceiveStream(streamID, mockSender, mockFC, protocol.VersionWhatever) 33 34 timeout := scaleDuration(250 * time.Millisecond) 35 strWithTimeout = gbytes.TimeoutReader(str, timeout) 36 }) 37 38 It("gets stream id", func() { 39 Expect(str.StreamID()).To(Equal(protocol.StreamID(1337))) 40 }) 41 42 Context("reading", func() { 43 It("reads a single STREAM frame", func() { 44 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) 45 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4)) 46 frame := wire.StreamFrame{ 47 Offset: 0, 48 Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, 49 } 50 err := str.handleStreamFrame(&frame) 51 Expect(err).ToNot(HaveOccurred()) 52 b := make([]byte, 4) 53 n, err := strWithTimeout.Read(b) 54 Expect(err).ToNot(HaveOccurred()) 55 Expect(n).To(Equal(4)) 56 Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) 57 }) 58 59 It("reads a single STREAM frame in multiple goes", func() { 60 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) 61 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)) 62 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)) 63 frame := wire.StreamFrame{ 64 Offset: 0, 65 Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, 66 } 67 err := str.handleStreamFrame(&frame) 68 Expect(err).ToNot(HaveOccurred()) 69 b := make([]byte, 2) 70 n, err := strWithTimeout.Read(b) 71 Expect(err).ToNot(HaveOccurred()) 72 Expect(n).To(Equal(2)) 73 Expect(b).To(Equal([]byte{0xDE, 0xAD})) 74 n, err = strWithTimeout.Read(b) 75 Expect(err).ToNot(HaveOccurred()) 76 Expect(n).To(Equal(2)) 77 Expect(b).To(Equal([]byte{0xBE, 0xEF})) 78 }) 79 80 It("reads all data available", func() { 81 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) 82 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) 83 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2) 84 frame1 := wire.StreamFrame{ 85 Offset: 0, 86 Data: []byte{0xDE, 0xAD}, 87 } 88 frame2 := wire.StreamFrame{ 89 Offset: 2, 90 Data: []byte{0xBE, 0xEF}, 91 } 92 err := str.handleStreamFrame(&frame1) 93 Expect(err).ToNot(HaveOccurred()) 94 err = str.handleStreamFrame(&frame2) 95 Expect(err).ToNot(HaveOccurred()) 96 b := make([]byte, 6) 97 n, err := strWithTimeout.Read(b) 98 Expect(err).ToNot(HaveOccurred()) 99 Expect(n).To(Equal(4)) 100 Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00})) 101 }) 102 103 It("assembles multiple STREAM frames", func() { 104 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) 105 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) 106 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2) 107 frame1 := wire.StreamFrame{ 108 Offset: 0, 109 Data: []byte{0xDE, 0xAD}, 110 } 111 frame2 := wire.StreamFrame{ 112 Offset: 2, 113 Data: []byte{0xBE, 0xEF}, 114 } 115 err := str.handleStreamFrame(&frame1) 116 Expect(err).ToNot(HaveOccurred()) 117 err = str.handleStreamFrame(&frame2) 118 Expect(err).ToNot(HaveOccurred()) 119 b := make([]byte, 4) 120 n, err := strWithTimeout.Read(b) 121 Expect(err).ToNot(HaveOccurred()) 122 Expect(n).To(Equal(4)) 123 Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) 124 }) 125 126 It("waits until data is available", func() { 127 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) 128 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)) 129 go func() { 130 defer GinkgoRecover() 131 frame := wire.StreamFrame{Data: []byte{0xDE, 0xAD}} 132 time.Sleep(10 * time.Millisecond) 133 err := str.handleStreamFrame(&frame) 134 Expect(err).ToNot(HaveOccurred()) 135 }() 136 b := make([]byte, 2) 137 n, err := strWithTimeout.Read(b) 138 Expect(err).ToNot(HaveOccurred()) 139 Expect(n).To(Equal(2)) 140 }) 141 142 It("handles STREAM frames in wrong order", func() { 143 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) 144 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) 145 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2) 146 frame1 := wire.StreamFrame{ 147 Offset: 2, 148 Data: []byte{0xBE, 0xEF}, 149 } 150 frame2 := wire.StreamFrame{ 151 Offset: 0, 152 Data: []byte{0xDE, 0xAD}, 153 } 154 err := str.handleStreamFrame(&frame1) 155 Expect(err).ToNot(HaveOccurred()) 156 err = str.handleStreamFrame(&frame2) 157 Expect(err).ToNot(HaveOccurred()) 158 b := make([]byte, 4) 159 n, err := strWithTimeout.Read(b) 160 Expect(err).ToNot(HaveOccurred()) 161 Expect(n).To(Equal(4)) 162 Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) 163 }) 164 165 It("ignores duplicate STREAM frames", func() { 166 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) 167 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) 168 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) 169 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2) 170 frame1 := wire.StreamFrame{ 171 Offset: 0, 172 Data: []byte{0xDE, 0xAD}, 173 } 174 frame2 := wire.StreamFrame{ 175 Offset: 0, 176 Data: []byte{0x13, 0x37}, 177 } 178 frame3 := wire.StreamFrame{ 179 Offset: 2, 180 Data: []byte{0xBE, 0xEF}, 181 } 182 err := str.handleStreamFrame(&frame1) 183 Expect(err).ToNot(HaveOccurred()) 184 err = str.handleStreamFrame(&frame2) 185 Expect(err).ToNot(HaveOccurred()) 186 err = str.handleStreamFrame(&frame3) 187 Expect(err).ToNot(HaveOccurred()) 188 b := make([]byte, 4) 189 n, err := strWithTimeout.Read(b) 190 Expect(err).ToNot(HaveOccurred()) 191 Expect(n).To(Equal(4)) 192 Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) 193 }) 194 195 It("doesn't rejects a STREAM frames with an overlapping data range", func() { 196 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false) 197 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false) 198 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)) 199 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4)) 200 frame1 := wire.StreamFrame{ 201 Offset: 0, 202 Data: []byte("foob"), 203 } 204 frame2 := wire.StreamFrame{ 205 Offset: 2, 206 Data: []byte("obar"), 207 } 208 err := str.handleStreamFrame(&frame1) 209 Expect(err).ToNot(HaveOccurred()) 210 err = str.handleStreamFrame(&frame2) 211 Expect(err).ToNot(HaveOccurred()) 212 b := make([]byte, 6) 213 n, err := strWithTimeout.Read(b) 214 Expect(err).ToNot(HaveOccurred()) 215 Expect(n).To(Equal(6)) 216 Expect(b).To(Equal([]byte("foobar"))) 217 }) 218 219 Context("deadlines", func() { 220 It("the deadline error has the right net.Error properties", func() { 221 Expect(errDeadline.Temporary()).To(BeTrue()) 222 Expect(errDeadline.Timeout()).To(BeTrue()) 223 Expect(errDeadline).To(MatchError("deadline exceeded")) 224 }) 225 226 It("returns an error when Read is called after the deadline", func() { 227 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false).AnyTimes() 228 f := &wire.StreamFrame{Data: []byte("foobar")} 229 err := str.handleStreamFrame(f) 230 Expect(err).ToNot(HaveOccurred()) 231 str.SetReadDeadline(time.Now().Add(-time.Second)) 232 b := make([]byte, 6) 233 n, err := strWithTimeout.Read(b) 234 Expect(err).To(MatchError(errDeadline)) 235 Expect(n).To(BeZero()) 236 }) 237 238 It("unblocks when the deadline is changed to the past", func() { 239 str.SetReadDeadline(time.Now().Add(time.Hour)) 240 done := make(chan struct{}) 241 go func() { 242 defer GinkgoRecover() 243 _, err := str.Read(make([]byte, 6)) 244 Expect(err).To(MatchError(errDeadline)) 245 close(done) 246 }() 247 Consistently(done).ShouldNot(BeClosed()) 248 str.SetReadDeadline(time.Now().Add(-time.Hour)) 249 Eventually(done).Should(BeClosed()) 250 }) 251 252 It("unblocks after the deadline", func() { 253 deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) 254 str.SetReadDeadline(deadline) 255 b := make([]byte, 6) 256 n, err := strWithTimeout.Read(b) 257 Expect(err).To(MatchError(errDeadline)) 258 Expect(n).To(BeZero()) 259 Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(20*time.Millisecond))) 260 }) 261 262 It("doesn't unblock if the deadline is changed before the first one expires", func() { 263 deadline1 := time.Now().Add(scaleDuration(50 * time.Millisecond)) 264 deadline2 := time.Now().Add(scaleDuration(100 * time.Millisecond)) 265 str.SetReadDeadline(deadline1) 266 go func() { 267 defer GinkgoRecover() 268 time.Sleep(scaleDuration(20 * time.Millisecond)) 269 str.SetReadDeadline(deadline2) 270 // make sure that this was actually execute before the deadline expires 271 Expect(time.Now()).To(BeTemporally("<", deadline1)) 272 }() 273 runtime.Gosched() 274 b := make([]byte, 10) 275 n, err := strWithTimeout.Read(b) 276 Expect(err).To(MatchError(errDeadline)) 277 Expect(n).To(BeZero()) 278 Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond))) 279 }) 280 281 It("unblocks earlier, when a new deadline is set", func() { 282 deadline1 := time.Now().Add(scaleDuration(200 * time.Millisecond)) 283 deadline2 := time.Now().Add(scaleDuration(50 * time.Millisecond)) 284 go func() { 285 defer GinkgoRecover() 286 time.Sleep(scaleDuration(10 * time.Millisecond)) 287 str.SetReadDeadline(deadline2) 288 // make sure that this was actually execute before the deadline expires 289 Expect(time.Now()).To(BeTemporally("<", deadline2)) 290 }() 291 str.SetReadDeadline(deadline1) 292 runtime.Gosched() 293 b := make([]byte, 10) 294 _, err := strWithTimeout.Read(b) 295 Expect(err).To(MatchError(errDeadline)) 296 Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(25*time.Millisecond))) 297 }) 298 299 It("doesn't unblock if the deadline is removed", func() { 300 deadline := time.Now().Add(scaleDuration(50 * time.Millisecond)) 301 str.SetReadDeadline(deadline) 302 deadlineUnset := make(chan struct{}) 303 go func() { 304 defer GinkgoRecover() 305 time.Sleep(scaleDuration(20 * time.Millisecond)) 306 str.SetReadDeadline(time.Time{}) 307 // make sure that this was actually execute before the deadline expires 308 Expect(time.Now()).To(BeTemporally("<", deadline)) 309 close(deadlineUnset) 310 }() 311 done := make(chan struct{}) 312 go func() { 313 defer GinkgoRecover() 314 _, err := strWithTimeout.Read(make([]byte, 1)) 315 Expect(err).To(MatchError("test done")) 316 close(done) 317 }() 318 runtime.Gosched() 319 Eventually(deadlineUnset).Should(BeClosed()) 320 Consistently(done, scaleDuration(100*time.Millisecond)).ShouldNot(BeClosed()) 321 // make the go routine return 322 str.closeForShutdown(errors.New("test done")) 323 Eventually(done).Should(BeClosed()) 324 }) 325 }) 326 327 Context("closing", func() { 328 Context("with FIN bit", func() { 329 It("returns EOFs", func() { 330 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true) 331 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4)) 332 str.handleStreamFrame(&wire.StreamFrame{ 333 Offset: 0, 334 Data: []byte{0xDE, 0xAD, 0xBE, 0xEF}, 335 Fin: true, 336 }) 337 mockSender.EXPECT().onStreamCompleted(streamID) 338 b := make([]byte, 4) 339 n, err := strWithTimeout.Read(b) 340 Expect(err).To(MatchError(io.EOF)) 341 Expect(n).To(Equal(4)) 342 Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) 343 n, err = strWithTimeout.Read(b) 344 Expect(n).To(BeZero()) 345 Expect(err).To(MatchError(io.EOF)) 346 }) 347 348 It("handles out-of-order frames", func() { 349 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false) 350 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true) 351 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2) 352 frame1 := wire.StreamFrame{ 353 Offset: 2, 354 Data: []byte{0xBE, 0xEF}, 355 Fin: true, 356 } 357 frame2 := wire.StreamFrame{ 358 Offset: 0, 359 Data: []byte{0xDE, 0xAD}, 360 } 361 err := str.handleStreamFrame(&frame1) 362 Expect(err).ToNot(HaveOccurred()) 363 err = str.handleStreamFrame(&frame2) 364 Expect(err).ToNot(HaveOccurred()) 365 mockSender.EXPECT().onStreamCompleted(streamID) 366 b := make([]byte, 4) 367 n, err := strWithTimeout.Read(b) 368 Expect(err).To(MatchError(io.EOF)) 369 Expect(n).To(Equal(4)) 370 Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF})) 371 n, err = strWithTimeout.Read(b) 372 Expect(n).To(BeZero()) 373 Expect(err).To(MatchError(io.EOF)) 374 }) 375 376 It("returns EOFs with partial read", func() { 377 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), true) 378 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)) 379 err := str.handleStreamFrame(&wire.StreamFrame{ 380 Offset: 0, 381 Data: []byte{0xde, 0xad}, 382 Fin: true, 383 }) 384 Expect(err).ToNot(HaveOccurred()) 385 mockSender.EXPECT().onStreamCompleted(streamID) 386 b := make([]byte, 4) 387 n, err := strWithTimeout.Read(b) 388 Expect(err).To(MatchError(io.EOF)) 389 Expect(n).To(Equal(2)) 390 Expect(b[:n]).To(Equal([]byte{0xde, 0xad})) 391 }) 392 393 It("handles immediate FINs", func() { 394 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true) 395 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0)) 396 err := str.handleStreamFrame(&wire.StreamFrame{ 397 Offset: 0, 398 Fin: true, 399 }) 400 Expect(err).ToNot(HaveOccurred()) 401 mockSender.EXPECT().onStreamCompleted(streamID) 402 b := make([]byte, 4) 403 n, err := strWithTimeout.Read(b) 404 Expect(n).To(BeZero()) 405 Expect(err).To(MatchError(io.EOF)) 406 }) 407 }) 408 409 It("closes when CloseRemote is called", func() { 410 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true) 411 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0)) 412 str.CloseRemote(0) 413 mockSender.EXPECT().onStreamCompleted(streamID) 414 b := make([]byte, 8) 415 n, err := strWithTimeout.Read(b) 416 Expect(n).To(BeZero()) 417 Expect(err).To(MatchError(io.EOF)) 418 }) 419 }) 420 421 Context("closing for shutdown", func() { 422 testErr := errors.New("test error") 423 424 It("immediately returns all reads", func() { 425 done := make(chan struct{}) 426 b := make([]byte, 4) 427 go func() { 428 defer GinkgoRecover() 429 n, err := strWithTimeout.Read(b) 430 Expect(n).To(BeZero()) 431 Expect(err).To(MatchError(testErr)) 432 close(done) 433 }() 434 Consistently(done).ShouldNot(BeClosed()) 435 str.closeForShutdown(testErr) 436 Eventually(done).Should(BeClosed()) 437 }) 438 439 It("errors for all following reads", func() { 440 str.closeForShutdown(testErr) 441 b := make([]byte, 1) 442 n, err := strWithTimeout.Read(b) 443 Expect(n).To(BeZero()) 444 Expect(err).To(MatchError(testErr)) 445 }) 446 }) 447 }) 448 449 Context("stream cancelations", func() { 450 Context("canceling read", func() { 451 It("unblocks Read", func() { 452 mockSender.EXPECT().queueControlFrame(gomock.Any()) 453 done := make(chan struct{}) 454 go func() { 455 defer GinkgoRecover() 456 _, err := strWithTimeout.Read([]byte{0}) 457 Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234")) 458 close(done) 459 }() 460 Consistently(done).ShouldNot(BeClosed()) 461 str.CancelRead(1234) 462 Eventually(done).Should(BeClosed()) 463 }) 464 465 It("doesn't allow further calls to Read", func() { 466 mockSender.EXPECT().queueControlFrame(gomock.Any()) 467 str.CancelRead(1234) 468 _, err := strWithTimeout.Read([]byte{0}) 469 Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234")) 470 }) 471 472 It("does nothing when CancelRead is called twice", func() { 473 mockSender.EXPECT().queueControlFrame(gomock.Any()) 474 str.CancelRead(1234) 475 str.CancelRead(1234) 476 _, err := strWithTimeout.Read([]byte{0}) 477 Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234")) 478 }) 479 480 It("queues a STOP_SENDING frame", func() { 481 mockSender.EXPECT().queueControlFrame(&wire.StopSendingFrame{ 482 StreamID: streamID, 483 ErrorCode: 1234, 484 }) 485 str.CancelRead(1234) 486 }) 487 488 It("doesn't send a STOP_SENDING frame, if the FIN was already read", func() { 489 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), true) 490 mockFC.EXPECT().AddBytesRead(protocol.ByteCount(6)) 491 // no calls to mockSender.queueControlFrame 492 Expect(str.handleStreamFrame(&wire.StreamFrame{ 493 StreamID: streamID, 494 Data: []byte("foobar"), 495 Fin: true, 496 })).To(Succeed()) 497 mockSender.EXPECT().onStreamCompleted(streamID) 498 _, err := strWithTimeout.Read(make([]byte, 100)) 499 Expect(err).To(MatchError(io.EOF)) 500 str.CancelRead(1234) 501 }) 502 503 It("doesn't send a STOP_SENDING frame, if the stream was already reset", func() { 504 gomock.InOrder( 505 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true), 506 mockFC.EXPECT().Abandon(), 507 ) 508 mockSender.EXPECT().onStreamCompleted(streamID) 509 Expect(str.handleResetStreamFrame(&wire.ResetStreamFrame{ 510 StreamID: streamID, 511 FinalSize: 42, 512 })).To(Succeed()) 513 str.CancelRead(1234) 514 }) 515 516 It("sends a STOP_SENDING and completes the stream after receiving the final offset", func() { 517 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true) 518 Expect(str.handleStreamFrame(&wire.StreamFrame{ 519 Offset: 1000, 520 Fin: true, 521 })).To(Succeed()) 522 mockFC.EXPECT().Abandon() 523 mockSender.EXPECT().queueControlFrame(gomock.Any()) 524 mockSender.EXPECT().onStreamCompleted(streamID) 525 str.CancelRead(1234) 526 }) 527 528 It("completes the stream when receiving the Fin after the stream was canceled", func() { 529 mockSender.EXPECT().queueControlFrame(gomock.Any()) 530 str.CancelRead(1234) 531 gomock.InOrder( 532 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true), 533 mockFC.EXPECT().Abandon(), 534 ) 535 mockSender.EXPECT().onStreamCompleted(streamID) 536 Expect(str.handleStreamFrame(&wire.StreamFrame{ 537 Offset: 1000, 538 Fin: true, 539 })).To(Succeed()) 540 }) 541 542 It("handles duplicate FinBits after the stream was canceled", func() { 543 mockSender.EXPECT().queueControlFrame(gomock.Any()) 544 str.CancelRead(1234) 545 gomock.InOrder( 546 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true), 547 mockFC.EXPECT().Abandon(), 548 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true), 549 ) 550 mockSender.EXPECT().onStreamCompleted(streamID) 551 Expect(str.handleStreamFrame(&wire.StreamFrame{ 552 Offset: 1000, 553 Fin: true, 554 })).To(Succeed()) 555 Expect(str.handleStreamFrame(&wire.StreamFrame{ 556 Offset: 1000, 557 Fin: true, 558 })).To(Succeed()) 559 }) 560 }) 561 562 Context("receiving RESET_STREAM frames", func() { 563 rst := &wire.ResetStreamFrame{ 564 StreamID: streamID, 565 FinalSize: 42, 566 ErrorCode: 1234, 567 } 568 569 It("unblocks Read", func() { 570 done := make(chan struct{}) 571 go func() { 572 defer GinkgoRecover() 573 _, err := strWithTimeout.Read([]byte{0}) 574 Expect(err).To(MatchError(&StreamError{ 575 StreamID: streamID, 576 ErrorCode: 1234, 577 })) 578 close(done) 579 }() 580 Consistently(done).ShouldNot(BeClosed()) 581 mockSender.EXPECT().onStreamCompleted(streamID) 582 gomock.InOrder( 583 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true), 584 mockFC.EXPECT().Abandon(), 585 ) 586 Expect(str.handleResetStreamFrame(rst)).To(Succeed()) 587 Eventually(done).Should(BeClosed()) 588 }) 589 590 It("doesn't allow further calls to Read", func() { 591 mockSender.EXPECT().onStreamCompleted(streamID) 592 gomock.InOrder( 593 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true), 594 mockFC.EXPECT().Abandon(), 595 ) 596 Expect(str.handleResetStreamFrame(rst)).To(Succeed()) 597 _, err := strWithTimeout.Read([]byte{0}) 598 Expect(err).To(MatchError(&StreamError{ 599 StreamID: streamID, 600 ErrorCode: 1234, 601 })) 602 }) 603 604 It("errors when receiving a RESET_STREAM with an inconsistent offset", func() { 605 testErr := errors.New("already received a different final offset before") 606 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Return(testErr) 607 err := str.handleResetStreamFrame(rst) 608 Expect(err).To(MatchError(testErr)) 609 }) 610 611 It("ignores duplicate RESET_STREAM frames", func() { 612 mockSender.EXPECT().onStreamCompleted(streamID) 613 mockFC.EXPECT().Abandon() 614 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Times(2) 615 Expect(str.handleResetStreamFrame(rst)).To(Succeed()) 616 Expect(str.handleResetStreamFrame(rst)).To(Succeed()) 617 }) 618 619 It("doesn't call onStreamCompleted again when the final offset was already received via Fin", func() { 620 mockSender.EXPECT().queueControlFrame(gomock.Any()) 621 str.CancelRead(1234) 622 mockSender.EXPECT().onStreamCompleted(streamID) 623 mockFC.EXPECT().Abandon() 624 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Times(2) 625 Expect(str.handleStreamFrame(&wire.StreamFrame{ 626 StreamID: streamID, 627 Offset: rst.FinalSize, 628 Fin: true, 629 })).To(Succeed()) 630 Expect(str.handleResetStreamFrame(rst)).To(Succeed()) 631 }) 632 633 It("doesn't do anyting when it was closed for shutdown", func() { 634 str.closeForShutdown(nil) 635 err := str.handleResetStreamFrame(rst) 636 Expect(err).ToNot(HaveOccurred()) 637 }) 638 }) 639 }) 640 641 Context("flow control", func() { 642 It("errors when a STREAM frame causes a flow control violation", func() { 643 testErr := errors.New("flow control violation") 644 mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(8), false).Return(testErr) 645 frame := wire.StreamFrame{ 646 Offset: 2, 647 Data: []byte("foobar"), 648 } 649 err := str.handleStreamFrame(&frame) 650 Expect(err).To(MatchError(testErr)) 651 }) 652 653 It("gets a window update", func() { 654 mockFC.EXPECT().GetWindowUpdate().Return(protocol.ByteCount(0x100)) 655 Expect(str.getWindowUpdate()).To(Equal(protocol.ByteCount(0x100))) 656 }) 657 }) 658}) 659