1package flowcontrol 2 3import ( 4 "time" 5 6 "github.com/lucas-clemente/quic-go/internal/protocol" 7 "github.com/lucas-clemente/quic-go/internal/utils" 8 . "github.com/onsi/ginkgo" 9 . "github.com/onsi/gomega" 10) 11 12var _ = Describe("Connection Flow controller", func() { 13 var ( 14 controller *connectionFlowController 15 queuedWindowUpdate bool 16 ) 17 18 // update the congestion such that it returns a given value for the smoothed RTT 19 setRtt := func(t time.Duration) { 20 controller.rttStats.UpdateRTT(t, 0, time.Now()) 21 Expect(controller.rttStats.SmoothedRTT()).To(Equal(t)) // make sure it worked 22 } 23 24 BeforeEach(func() { 25 queuedWindowUpdate = false 26 controller = &connectionFlowController{} 27 controller.rttStats = &utils.RTTStats{} 28 controller.logger = utils.DefaultLogger 29 controller.queueWindowUpdate = func() { queuedWindowUpdate = true } 30 }) 31 32 Context("Constructor", func() { 33 rttStats := &utils.RTTStats{} 34 35 It("sets the send and receive windows", func() { 36 receiveWindow := protocol.ByteCount(2000) 37 maxReceiveWindow := protocol.ByteCount(3000) 38 39 fc := NewConnectionFlowController(receiveWindow, maxReceiveWindow, nil, rttStats, utils.DefaultLogger).(*connectionFlowController) 40 Expect(fc.receiveWindow).To(Equal(receiveWindow)) 41 Expect(fc.maxReceiveWindowSize).To(Equal(maxReceiveWindow)) 42 }) 43 }) 44 45 Context("receive flow control", func() { 46 It("increases the highestReceived by a given window size", func() { 47 controller.highestReceived = 1337 48 controller.IncrementHighestReceived(123) 49 Expect(controller.highestReceived).To(Equal(protocol.ByteCount(1337 + 123))) 50 }) 51 52 Context("getting window updates", func() { 53 BeforeEach(func() { 54 controller.receiveWindow = 100 55 controller.receiveWindowSize = 60 56 controller.maxReceiveWindowSize = 1000 57 controller.bytesRead = 100 - 60 58 }) 59 60 It("queues window updates", func() { 61 controller.AddBytesRead(1) 62 Expect(queuedWindowUpdate).To(BeFalse()) 63 controller.AddBytesRead(29) 64 Expect(queuedWindowUpdate).To(BeTrue()) 65 Expect(controller.GetWindowUpdate()).ToNot(BeZero()) 66 queuedWindowUpdate = false 67 controller.AddBytesRead(1) 68 Expect(queuedWindowUpdate).To(BeFalse()) 69 }) 70 71 It("gets a window update", func() { 72 windowSize := controller.receiveWindowSize 73 oldOffset := controller.bytesRead 74 dataRead := windowSize/2 - 1 // make sure not to trigger auto-tuning 75 controller.AddBytesRead(dataRead) 76 offset := controller.GetWindowUpdate() 77 Expect(offset).To(Equal(oldOffset + dataRead + 60)) 78 }) 79 80 It("autotunes the window", func() { 81 oldOffset := controller.bytesRead 82 oldWindowSize := controller.receiveWindowSize 83 rtt := scaleDuration(20 * time.Millisecond) 84 setRtt(rtt) 85 controller.epochStartTime = time.Now().Add(-time.Millisecond) 86 controller.epochStartOffset = oldOffset 87 dataRead := oldWindowSize/2 + 1 88 controller.AddBytesRead(dataRead) 89 offset := controller.GetWindowUpdate() 90 newWindowSize := controller.receiveWindowSize 91 Expect(newWindowSize).To(Equal(2 * oldWindowSize)) 92 Expect(offset).To(Equal(oldOffset + dataRead + newWindowSize)) 93 }) 94 }) 95 }) 96 97 Context("setting the minimum window size", func() { 98 var ( 99 oldWindowSize protocol.ByteCount 100 receiveWindow protocol.ByteCount = 10000 101 receiveWindowSize protocol.ByteCount = 1000 102 ) 103 104 BeforeEach(func() { 105 controller.receiveWindow = receiveWindow 106 controller.receiveWindowSize = receiveWindowSize 107 oldWindowSize = controller.receiveWindowSize 108 controller.maxReceiveWindowSize = 3000 109 }) 110 111 It("sets the minimum window window size", func() { 112 controller.EnsureMinimumWindowSize(1800) 113 Expect(controller.receiveWindowSize).To(Equal(protocol.ByteCount(1800))) 114 }) 115 116 It("doesn't reduce the window window size", func() { 117 controller.EnsureMinimumWindowSize(1) 118 Expect(controller.receiveWindowSize).To(Equal(oldWindowSize)) 119 }) 120 121 It("doens't increase the window size beyond the maxReceiveWindowSize", func() { 122 max := controller.maxReceiveWindowSize 123 controller.EnsureMinimumWindowSize(2 * max) 124 Expect(controller.receiveWindowSize).To(Equal(max)) 125 }) 126 127 It("starts a new epoch after the window size was increased", func() { 128 controller.EnsureMinimumWindowSize(1912) 129 Expect(controller.epochStartTime).To(BeTemporally("~", time.Now(), 100*time.Millisecond)) 130 }) 131 }) 132 133 Context("resetting", func() { 134 It("resets", func() { 135 const initialWindow protocol.ByteCount = 1337 136 controller.UpdateSendWindow(initialWindow) 137 controller.AddBytesSent(1000) 138 Expect(controller.SendWindowSize()).To(Equal(initialWindow - 1000)) 139 Expect(controller.Reset()).To(Succeed()) 140 Expect(controller.SendWindowSize()).To(Equal(initialWindow)) 141 }) 142 143 It("says if is blocked after resetting", func() { 144 const initialWindow protocol.ByteCount = 1337 145 controller.UpdateSendWindow(initialWindow) 146 controller.AddBytesSent(initialWindow) 147 blocked, _ := controller.IsNewlyBlocked() 148 Expect(blocked).To(BeTrue()) 149 Expect(controller.Reset()).To(Succeed()) 150 controller.AddBytesSent(initialWindow) 151 blocked, blockedAt := controller.IsNewlyBlocked() 152 Expect(blocked).To(BeTrue()) 153 Expect(blockedAt).To(Equal(initialWindow)) 154 }) 155 }) 156}) 157