1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "fmt" 23 "math" 24 "sync" 25 "sync/atomic" 26) 27 28// writeQuota is a soft limit on the amount of data a stream can 29// schedule before some of it is written out. 30type writeQuota struct { 31 quota int32 32 // get waits on read from when quota goes less than or equal to zero. 33 // replenish writes on it when quota goes positive again. 34 ch chan struct{} 35 // done is triggered in error case. 36 done <-chan struct{} 37 // replenish is called by loopyWriter to give quota back to. 38 // It is implemented as a field so that it can be updated 39 // by tests. 40 replenish func(n int) 41} 42 43func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota { 44 w := &writeQuota{ 45 quota: sz, 46 ch: make(chan struct{}, 1), 47 done: done, 48 } 49 w.replenish = w.realReplenish 50 return w 51} 52 53func (w *writeQuota) get(sz int32) error { 54 for { 55 if atomic.LoadInt32(&w.quota) > 0 { 56 atomic.AddInt32(&w.quota, -sz) 57 return nil 58 } 59 select { 60 case <-w.ch: 61 continue 62 case <-w.done: 63 return errStreamDone 64 } 65 } 66} 67 68func (w *writeQuota) realReplenish(n int) { 69 sz := int32(n) 70 a := atomic.AddInt32(&w.quota, sz) 71 b := a - sz 72 if b <= 0 && a > 0 { 73 select { 74 case w.ch <- struct{}{}: 75 default: 76 } 77 } 78} 79 80type trInFlow struct { 81 limit uint32 82 unacked uint32 83 effectiveWindowSize uint32 84} 85 86func (f *trInFlow) newLimit(n uint32) uint32 { 87 d := n - f.limit 88 f.limit = n 89 f.updateEffectiveWindowSize() 90 return d 91} 92 93func (f *trInFlow) onData(n uint32) uint32 { 94 f.unacked += n 95 if f.unacked >= f.limit/4 { 96 w := f.unacked 97 f.unacked = 0 98 f.updateEffectiveWindowSize() 99 return w 100 } 101 f.updateEffectiveWindowSize() 102 return 0 103} 104 105func (f *trInFlow) reset() uint32 { 106 w := f.unacked 107 f.unacked = 0 108 f.updateEffectiveWindowSize() 109 return w 110} 111 112func (f *trInFlow) updateEffectiveWindowSize() { 113 atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked) 114} 115 116func (f *trInFlow) getSize() uint32 { 117 return atomic.LoadUint32(&f.effectiveWindowSize) 118} 119 120// TODO(mmukhi): Simplify this code. 121// inFlow deals with inbound flow control 122type inFlow struct { 123 mu sync.Mutex 124 // The inbound flow control limit for pending data. 125 limit uint32 126 // pendingData is the overall data which have been received but not been 127 // consumed by applications. 128 pendingData uint32 129 // The amount of data the application has consumed but grpc has not sent 130 // window update for them. Used to reduce window update frequency. 131 pendingUpdate uint32 132 // delta is the extra window update given by receiver when an application 133 // is reading data bigger in size than the inFlow limit. 134 delta uint32 135} 136 137// newLimit updates the inflow window to a new value n. 138// It assumes that n is always greater than the old limit. 139func (f *inFlow) newLimit(n uint32) uint32 { 140 f.mu.Lock() 141 d := n - f.limit 142 f.limit = n 143 f.mu.Unlock() 144 return d 145} 146 147func (f *inFlow) maybeAdjust(n uint32) uint32 { 148 if n > uint32(math.MaxInt32) { 149 n = uint32(math.MaxInt32) 150 } 151 f.mu.Lock() 152 defer f.mu.Unlock() 153 // estSenderQuota is the receiver's view of the maximum number of bytes the sender 154 // can send without a window update. 155 estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate)) 156 // estUntransmittedData is the maximum number of bytes the sends might not have put 157 // on the wire yet. A value of 0 or less means that we have already received all or 158 // more bytes than the application is requesting to read. 159 estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative. 160 // This implies that unless we send a window update, the sender won't be able to send all the bytes 161 // for this message. Therefore we must send an update over the limit since there's an active read 162 // request from the application. 163 if estUntransmittedData > estSenderQuota { 164 // Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec. 165 if f.limit+n > maxWindowSize { 166 f.delta = maxWindowSize - f.limit 167 } else { 168 // Send a window update for the whole message and not just the difference between 169 // estUntransmittedData and estSenderQuota. This will be helpful in case the message 170 // is padded; We will fallback on the current available window(at least a 1/4th of the limit). 171 f.delta = n 172 } 173 return f.delta 174 } 175 return 0 176} 177 178// onData is invoked when some data frame is received. It updates pendingData. 179func (f *inFlow) onData(n uint32) error { 180 f.mu.Lock() 181 f.pendingData += n 182 if f.pendingData+f.pendingUpdate > f.limit+f.delta { 183 limit := f.limit 184 rcvd := f.pendingData + f.pendingUpdate 185 f.mu.Unlock() 186 return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit) 187 } 188 f.mu.Unlock() 189 return nil 190} 191 192// onRead is invoked when the application reads the data. It returns the window size 193// to be sent to the peer. 194func (f *inFlow) onRead(n uint32) uint32 { 195 f.mu.Lock() 196 if f.pendingData == 0 { 197 f.mu.Unlock() 198 return 0 199 } 200 f.pendingData -= n 201 if n > f.delta { 202 n -= f.delta 203 f.delta = 0 204 } else { 205 f.delta -= n 206 n = 0 207 } 208 f.pendingUpdate += n 209 if f.pendingUpdate >= f.limit/4 { 210 wu := f.pendingUpdate 211 f.pendingUpdate = 0 212 f.mu.Unlock() 213 return wu 214 } 215 f.mu.Unlock() 216 return 0 217} 218