1// Copyright 2016 The go-ethereum Authors 2// This file is part of the go-ethereum library. 3// 4// The go-ethereum library is free software: you can redistribute it and/or modify 5// it under the terms of the GNU Lesser General Public License as published by 6// the Free Software Foundation, either version 3 of the License, or 7// (at your option) any later version. 8// 9// The go-ethereum library is distributed in the hope that it will be useful, 10// but WITHOUT ANY WARRANTY; without even the implied warranty of 11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12// GNU Lesser General Public License for more details. 13// 14// You should have received a copy of the GNU Lesser General Public License 15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. 16 17// Package flowcontrol implements a client side flow control mechanism 18package flowcontrol 19 20import ( 21 "fmt" 22 "math" 23 "sync" 24 "time" 25 26 "github.com/ethereum/go-ethereum/common/mclock" 27 "github.com/ethereum/go-ethereum/log" 28) 29 30const ( 31 // fcTimeConst is the time constant applied for MinRecharge during linear 32 // buffer recharge period 33 fcTimeConst = time.Millisecond 34 // DecParamDelay is applied at server side when decreasing capacity in order to 35 // avoid a buffer underrun error due to requests sent by the client before 36 // receiving the capacity update announcement 37 DecParamDelay = time.Second * 2 38 // keepLogs is the duration of keeping logs; logging is not used if zero 39 keepLogs = 0 40) 41 42// ServerParams are the flow control parameters specified by a server for a client 43// 44// Note: a server can assign different amounts of capacity to each client by giving 45// different parameters to them. 46type ServerParams struct { 47 BufLimit, MinRecharge uint64 48} 49 50// scheduledUpdate represents a delayed flow control parameter update 51type scheduledUpdate struct { 52 time mclock.AbsTime 53 params ServerParams 54} 55 56// ClientNode is the flow control system's representation of a client 57// (used in server mode only) 58type ClientNode struct { 59 params ServerParams 60 bufValue int64 61 lastTime mclock.AbsTime 62 updateSchedule []scheduledUpdate 63 sumCost uint64 // sum of req costs received from this client 64 accepted map[uint64]uint64 // value = sumCost after accepting the given req 65 connected bool 66 lock sync.Mutex 67 cm *ClientManager 68 log *logger 69 cmNodeFields 70} 71 72// NewClientNode returns a new ClientNode 73func NewClientNode(cm *ClientManager, params ServerParams) *ClientNode { 74 node := &ClientNode{ 75 cm: cm, 76 params: params, 77 bufValue: int64(params.BufLimit), 78 lastTime: cm.clock.Now(), 79 accepted: make(map[uint64]uint64), 80 connected: true, 81 } 82 if keepLogs > 0 { 83 node.log = newLogger(keepLogs) 84 } 85 cm.connect(node) 86 return node 87} 88 89// Disconnect should be called when a client is disconnected 90func (node *ClientNode) Disconnect() { 91 node.lock.Lock() 92 defer node.lock.Unlock() 93 94 node.connected = false 95 node.cm.disconnect(node) 96} 97 98// BufferStatus returns the current buffer value and limit 99func (node *ClientNode) BufferStatus() (uint64, uint64) { 100 node.lock.Lock() 101 defer node.lock.Unlock() 102 103 if !node.connected { 104 return 0, 0 105 } 106 now := node.cm.clock.Now() 107 node.update(now) 108 node.cm.updateBuffer(node, 0, now) 109 bv := node.bufValue 110 if bv < 0 { 111 bv = 0 112 } 113 return uint64(bv), node.params.BufLimit 114} 115 116// OneTimeCost subtracts the given amount from the node's buffer. 117// 118// Note: this call can take the buffer into the negative region internally. 119// In this case zero buffer value is returned by exported calls and no requests 120// are accepted. 121func (node *ClientNode) OneTimeCost(cost uint64) { 122 node.lock.Lock() 123 defer node.lock.Unlock() 124 125 now := node.cm.clock.Now() 126 node.update(now) 127 node.bufValue -= int64(cost) 128 node.cm.updateBuffer(node, -int64(cost), now) 129} 130 131// Freeze notifies the client manager about a client freeze event in which case 132// the total capacity allowance is slightly reduced. 133func (node *ClientNode) Freeze() { 134 node.lock.Lock() 135 frozenCap := node.params.MinRecharge 136 node.lock.Unlock() 137 node.cm.reduceTotalCapacity(frozenCap) 138} 139 140// update recalculates the buffer value at a specified time while also performing 141// scheduled flow control parameter updates if necessary 142func (node *ClientNode) update(now mclock.AbsTime) { 143 for len(node.updateSchedule) > 0 && node.updateSchedule[0].time <= now { 144 node.recalcBV(node.updateSchedule[0].time) 145 node.updateParams(node.updateSchedule[0].params, now) 146 node.updateSchedule = node.updateSchedule[1:] 147 } 148 node.recalcBV(now) 149} 150 151// recalcBV recalculates the buffer value at a specified time 152func (node *ClientNode) recalcBV(now mclock.AbsTime) { 153 dt := uint64(now - node.lastTime) 154 if now < node.lastTime { 155 dt = 0 156 } 157 node.bufValue += int64(node.params.MinRecharge * dt / uint64(fcTimeConst)) 158 if node.bufValue > int64(node.params.BufLimit) { 159 node.bufValue = int64(node.params.BufLimit) 160 } 161 if node.log != nil { 162 node.log.add(now, fmt.Sprintf("updated bv=%d MRR=%d BufLimit=%d", node.bufValue, node.params.MinRecharge, node.params.BufLimit)) 163 } 164 node.lastTime = now 165} 166 167// UpdateParams updates the flow control parameters of a client node 168func (node *ClientNode) UpdateParams(params ServerParams) { 169 node.lock.Lock() 170 defer node.lock.Unlock() 171 172 now := node.cm.clock.Now() 173 node.update(now) 174 if params.MinRecharge >= node.params.MinRecharge { 175 node.updateSchedule = nil 176 node.updateParams(params, now) 177 } else { 178 for i, s := range node.updateSchedule { 179 if params.MinRecharge >= s.params.MinRecharge { 180 s.params = params 181 node.updateSchedule = node.updateSchedule[:i+1] 182 return 183 } 184 } 185 node.updateSchedule = append(node.updateSchedule, scheduledUpdate{time: now + mclock.AbsTime(DecParamDelay), params: params}) 186 } 187} 188 189// updateParams updates the flow control parameters of the node 190func (node *ClientNode) updateParams(params ServerParams, now mclock.AbsTime) { 191 diff := int64(params.BufLimit - node.params.BufLimit) 192 if diff > 0 { 193 node.bufValue += diff 194 } else if node.bufValue > int64(params.BufLimit) { 195 node.bufValue = int64(params.BufLimit) 196 } 197 node.cm.updateParams(node, params, now) 198} 199 200// AcceptRequest returns whether a new request can be accepted and the missing 201// buffer amount if it was rejected due to a buffer underrun. If accepted, maxCost 202// is deducted from the flow control buffer. 203func (node *ClientNode) AcceptRequest(reqID, index, maxCost uint64) (accepted bool, bufShort uint64, priority int64) { 204 node.lock.Lock() 205 defer node.lock.Unlock() 206 207 now := node.cm.clock.Now() 208 node.update(now) 209 if int64(maxCost) > node.bufValue { 210 if node.log != nil { 211 node.log.add(now, fmt.Sprintf("rejected reqID=%d bv=%d maxCost=%d", reqID, node.bufValue, maxCost)) 212 node.log.dump(now) 213 } 214 return false, maxCost - uint64(node.bufValue), 0 215 } 216 node.bufValue -= int64(maxCost) 217 node.sumCost += maxCost 218 if node.log != nil { 219 node.log.add(now, fmt.Sprintf("accepted reqID=%d bv=%d maxCost=%d sumCost=%d", reqID, node.bufValue, maxCost, node.sumCost)) 220 } 221 node.accepted[index] = node.sumCost 222 return true, 0, node.cm.accepted(node, maxCost, now) 223} 224 225// RequestProcessed should be called when the request has been processed 226func (node *ClientNode) RequestProcessed(reqID, index, maxCost, realCost uint64) uint64 { 227 node.lock.Lock() 228 defer node.lock.Unlock() 229 230 now := node.cm.clock.Now() 231 node.update(now) 232 node.cm.processed(node, maxCost, realCost, now) 233 bv := node.bufValue + int64(node.sumCost-node.accepted[index]) 234 if node.log != nil { 235 node.log.add(now, fmt.Sprintf("processed reqID=%d bv=%d maxCost=%d realCost=%d sumCost=%d oldSumCost=%d reportedBV=%d", reqID, node.bufValue, maxCost, realCost, node.sumCost, node.accepted[index], bv)) 236 } 237 delete(node.accepted, index) 238 if bv < 0 { 239 return 0 240 } 241 return uint64(bv) 242} 243 244// ServerNode is the flow control system's representation of a server 245// (used in client mode only) 246type ServerNode struct { 247 clock mclock.Clock 248 bufEstimate uint64 249 bufRecharge bool 250 lastTime mclock.AbsTime 251 params ServerParams 252 sumCost uint64 // sum of req costs sent to this server 253 pending map[uint64]uint64 // value = sumCost after sending the given req 254 log *logger 255 lock sync.RWMutex 256} 257 258// NewServerNode returns a new ServerNode 259func NewServerNode(params ServerParams, clock mclock.Clock) *ServerNode { 260 node := &ServerNode{ 261 clock: clock, 262 bufEstimate: params.BufLimit, 263 bufRecharge: false, 264 lastTime: clock.Now(), 265 params: params, 266 pending: make(map[uint64]uint64), 267 } 268 if keepLogs > 0 { 269 node.log = newLogger(keepLogs) 270 } 271 return node 272} 273 274// UpdateParams updates the flow control parameters of the node 275func (node *ServerNode) UpdateParams(params ServerParams) { 276 node.lock.Lock() 277 defer node.lock.Unlock() 278 279 node.recalcBLE(mclock.Now()) 280 if params.BufLimit > node.params.BufLimit { 281 node.bufEstimate += params.BufLimit - node.params.BufLimit 282 } else { 283 if node.bufEstimate > params.BufLimit { 284 node.bufEstimate = params.BufLimit 285 } 286 } 287 node.params = params 288} 289 290// recalcBLE recalculates the lowest estimate for the client's buffer value at 291// the given server at the specified time 292func (node *ServerNode) recalcBLE(now mclock.AbsTime) { 293 if now < node.lastTime { 294 return 295 } 296 if node.bufRecharge { 297 dt := uint64(now - node.lastTime) 298 node.bufEstimate += node.params.MinRecharge * dt / uint64(fcTimeConst) 299 if node.bufEstimate >= node.params.BufLimit { 300 node.bufEstimate = node.params.BufLimit 301 node.bufRecharge = false 302 } 303 } 304 node.lastTime = now 305 if node.log != nil { 306 node.log.add(now, fmt.Sprintf("updated bufEst=%d MRR=%d BufLimit=%d", node.bufEstimate, node.params.MinRecharge, node.params.BufLimit)) 307 } 308} 309 310// safetyMargin is added to the flow control waiting time when estimated buffer value is low 311const safetyMargin = time.Millisecond 312 313// CanSend returns the minimum waiting time required before sending a request 314// with the given maximum estimated cost. Second return value is the relative 315// estimated buffer level after sending the request (divided by BufLimit). 316func (node *ServerNode) CanSend(maxCost uint64) (time.Duration, float64) { 317 node.lock.RLock() 318 defer node.lock.RUnlock() 319 320 if node.params.BufLimit == 0 { 321 return time.Duration(math.MaxInt64), 0 322 } 323 now := node.clock.Now() 324 node.recalcBLE(now) 325 maxCost += uint64(safetyMargin) * node.params.MinRecharge / uint64(fcTimeConst) 326 if maxCost > node.params.BufLimit { 327 maxCost = node.params.BufLimit 328 } 329 if node.bufEstimate >= maxCost { 330 relBuf := float64(node.bufEstimate-maxCost) / float64(node.params.BufLimit) 331 if node.log != nil { 332 node.log.add(now, fmt.Sprintf("canSend bufEst=%d maxCost=%d true relBuf=%f", node.bufEstimate, maxCost, relBuf)) 333 } 334 return 0, relBuf 335 } 336 timeLeft := time.Duration((maxCost - node.bufEstimate) * uint64(fcTimeConst) / node.params.MinRecharge) 337 if node.log != nil { 338 node.log.add(now, fmt.Sprintf("canSend bufEst=%d maxCost=%d false timeLeft=%v", node.bufEstimate, maxCost, timeLeft)) 339 } 340 return timeLeft, 0 341} 342 343// QueuedRequest should be called when the request has been assigned to the given 344// server node, before putting it in the send queue. It is mandatory that requests 345// are sent in the same order as the QueuedRequest calls are made. 346func (node *ServerNode) QueuedRequest(reqID, maxCost uint64) { 347 node.lock.Lock() 348 defer node.lock.Unlock() 349 350 now := node.clock.Now() 351 node.recalcBLE(now) 352 // Note: we do not know when requests actually arrive to the server so bufRecharge 353 // is not turned on here if buffer was full; in this case it is going to be turned 354 // on by the first reply's bufValue feedback 355 if node.bufEstimate >= maxCost { 356 node.bufEstimate -= maxCost 357 } else { 358 log.Error("Queued request with insufficient buffer estimate") 359 node.bufEstimate = 0 360 } 361 node.sumCost += maxCost 362 node.pending[reqID] = node.sumCost 363 if node.log != nil { 364 node.log.add(now, fmt.Sprintf("queued reqID=%d bufEst=%d maxCost=%d sumCost=%d", reqID, node.bufEstimate, maxCost, node.sumCost)) 365 } 366} 367 368// ReceivedReply adjusts estimated buffer value according to the value included in 369// the latest request reply. 370func (node *ServerNode) ReceivedReply(reqID, bv uint64) { 371 node.lock.Lock() 372 defer node.lock.Unlock() 373 374 now := node.clock.Now() 375 node.recalcBLE(now) 376 if bv > node.params.BufLimit { 377 bv = node.params.BufLimit 378 } 379 sc, ok := node.pending[reqID] 380 if !ok { 381 return 382 } 383 delete(node.pending, reqID) 384 cc := node.sumCost - sc 385 newEstimate := uint64(0) 386 if bv > cc { 387 newEstimate = bv - cc 388 } 389 if newEstimate > node.bufEstimate { 390 // Note: we never reduce the buffer estimate based on the reported value because 391 // this can only happen because of the delayed delivery of the latest reply. 392 // The lowest estimate based on the previous reply can still be considered valid. 393 node.bufEstimate = newEstimate 394 } 395 396 node.bufRecharge = node.bufEstimate < node.params.BufLimit 397 node.lastTime = now 398 if node.log != nil { 399 node.log.add(now, fmt.Sprintf("received reqID=%d bufEst=%d reportedBv=%d sumCost=%d oldSumCost=%d", reqID, node.bufEstimate, bv, node.sumCost, sc)) 400 } 401} 402 403// ResumeFreeze cleans all pending requests and sets the buffer estimate to the 404// reported value after resuming from a frozen state 405func (node *ServerNode) ResumeFreeze(bv uint64) { 406 node.lock.Lock() 407 defer node.lock.Unlock() 408 409 for reqID := range node.pending { 410 delete(node.pending, reqID) 411 } 412 now := node.clock.Now() 413 node.recalcBLE(now) 414 if bv > node.params.BufLimit { 415 bv = node.params.BufLimit 416 } 417 node.bufEstimate = bv 418 node.bufRecharge = node.bufEstimate < node.params.BufLimit 419 node.lastTime = now 420 if node.log != nil { 421 node.log.add(now, fmt.Sprintf("unfreeze bv=%d sumCost=%d", bv, node.sumCost)) 422 } 423} 424 425// DumpLogs dumps the event log if logging is used 426func (node *ServerNode) DumpLogs() { 427 node.lock.Lock() 428 defer node.lock.Unlock() 429 430 if node.log != nil { 431 node.log.dump(node.clock.Now()) 432 } 433} 434