1// Copyright 2020 The go-ethereum Authors 2// This file is part of the go-ethereum library. 3// 4// The go-ethereum library is free software: you can redistribute it and/or modify 5// it under the terms of the GNU Lesser General Public License as published by 6// the Free Software Foundation, either version 3 of the License, or 7// (at your option) any later version. 8// 9// The go-ethereum library is distributed in the hope that it will be useful, 10// but WITHOUT ANY WARRANTY; without even the implied warranty of 11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12// GNU Lesser General Public License for more details. 13// 14// You should have received a copy of the GNU Lesser General Public License 15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. 16 17package server 18 19import ( 20 "sync" 21 "time" 22 23 "github.com/ethereum/go-ethereum/common/mclock" 24 "github.com/ethereum/go-ethereum/ethdb" 25 "github.com/ethereum/go-ethereum/les/utils" 26 "github.com/ethereum/go-ethereum/p2p/enode" 27 "github.com/ethereum/go-ethereum/p2p/enr" 28 "github.com/ethereum/go-ethereum/p2p/nodestate" 29) 30 31const ( 32 posThreshold = 1000000 // minimum positive balance that is persisted in the database 33 negThreshold = 1000000 // minimum negative balance that is persisted in the database 34 persistExpirationRefresh = time.Minute * 5 // refresh period of the token expiration persistence 35) 36 37// balanceTracker tracks positive and negative balances for connected nodes. 38// After clientField is set externally, a nodeBalance is created and previous 39// balance values are loaded from the database. Both balances are exponentially expired 40// values. Costs are deducted from the positive balance if present, otherwise added to 41// the negative balance. If the capacity is non-zero then a time cost is applied 42// continuously while individual request costs are applied immediately. 43// The two balances are translated into a single priority value that also depends 44// on the actual capacity. 45type balanceTracker struct { 46 setup *serverSetup 47 clock mclock.Clock 48 lock sync.Mutex 49 ns *nodestate.NodeStateMachine 50 ndb *nodeDB 51 posExp, negExp utils.ValueExpirer 52 53 posExpTC, negExpTC uint64 54 defaultPosFactors, defaultNegFactors PriceFactors 55 56 active, inactive utils.ExpiredValue 57 balanceTimer *utils.UpdateTimer 58 quit chan struct{} 59} 60 61// newBalanceTracker creates a new balanceTracker 62func newBalanceTracker(ns *nodestate.NodeStateMachine, setup *serverSetup, db ethdb.KeyValueStore, clock mclock.Clock, posExp, negExp utils.ValueExpirer) *balanceTracker { 63 ndb := newNodeDB(db, clock) 64 bt := &balanceTracker{ 65 ns: ns, 66 setup: setup, 67 ndb: ndb, 68 clock: clock, 69 posExp: posExp, 70 negExp: negExp, 71 balanceTimer: utils.NewUpdateTimer(clock, time.Second*10), 72 quit: make(chan struct{}), 73 } 74 posOffset, negOffset := bt.ndb.getExpiration() 75 posExp.SetLogOffset(clock.Now(), posOffset) 76 negExp.SetLogOffset(clock.Now(), negOffset) 77 78 // Load all persisted balance entries of priority nodes, 79 // calculate the total number of issued service tokens. 80 bt.ndb.forEachBalance(false, func(id enode.ID, balance utils.ExpiredValue) bool { 81 bt.inactive.AddExp(balance) 82 return true 83 }) 84 85 ns.SubscribeField(bt.setup.capacityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) { 86 n, _ := ns.GetField(node, bt.setup.balanceField).(*nodeBalance) 87 if n == nil { 88 return 89 } 90 91 ov, _ := oldValue.(uint64) 92 nv, _ := newValue.(uint64) 93 if ov == 0 && nv != 0 { 94 n.activate() 95 } 96 if nv != 0 { 97 n.setCapacity(nv) 98 } 99 if ov != 0 && nv == 0 { 100 n.deactivate() 101 } 102 }) 103 ns.SubscribeField(bt.setup.clientField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) { 104 type peer interface { 105 FreeClientId() string 106 } 107 if newValue != nil { 108 n := bt.newNodeBalance(node, newValue.(peer).FreeClientId(), true) 109 bt.lock.Lock() 110 n.SetPriceFactors(bt.defaultPosFactors, bt.defaultNegFactors) 111 bt.lock.Unlock() 112 ns.SetFieldSub(node, bt.setup.balanceField, n) 113 } else { 114 ns.SetStateSub(node, nodestate.Flags{}, bt.setup.priorityFlag, 0) 115 if b, _ := ns.GetField(node, bt.setup.balanceField).(*nodeBalance); b != nil { 116 b.deactivate() 117 } 118 ns.SetFieldSub(node, bt.setup.balanceField, nil) 119 } 120 }) 121 122 // The positive and negative balances of clients are stored in database 123 // and both of these decay exponentially over time. Delete them if the 124 // value is small enough. 125 bt.ndb.evictCallBack = bt.canDropBalance 126 127 go func() { 128 for { 129 select { 130 case <-clock.After(persistExpirationRefresh): 131 now := clock.Now() 132 bt.ndb.setExpiration(posExp.LogOffset(now), negExp.LogOffset(now)) 133 case <-bt.quit: 134 return 135 } 136 } 137 }() 138 return bt 139} 140 141// Stop saves expiration offset and unsaved node balances and shuts balanceTracker down 142func (bt *balanceTracker) stop() { 143 now := bt.clock.Now() 144 bt.ndb.setExpiration(bt.posExp.LogOffset(now), bt.negExp.LogOffset(now)) 145 close(bt.quit) 146 bt.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) { 147 if n, ok := bt.ns.GetField(node, bt.setup.balanceField).(*nodeBalance); ok { 148 n.lock.Lock() 149 n.storeBalance(true, true) 150 n.lock.Unlock() 151 bt.ns.SetField(node, bt.setup.balanceField, nil) 152 } 153 }) 154 bt.ndb.close() 155} 156 157// TotalTokenAmount returns the current total amount of service tokens in existence 158func (bt *balanceTracker) TotalTokenAmount() uint64 { 159 bt.lock.Lock() 160 defer bt.lock.Unlock() 161 162 bt.balanceTimer.Update(func(_ time.Duration) bool { 163 bt.active = utils.ExpiredValue{} 164 bt.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) { 165 if n, ok := bt.ns.GetField(node, bt.setup.balanceField).(*nodeBalance); ok && n.active { 166 pos, _ := n.GetRawBalance() 167 bt.active.AddExp(pos) 168 } 169 }) 170 return true 171 }) 172 total := bt.active 173 total.AddExp(bt.inactive) 174 return total.Value(bt.posExp.LogOffset(bt.clock.Now())) 175} 176 177// GetPosBalanceIDs lists node IDs with an associated positive balance 178func (bt *balanceTracker) GetPosBalanceIDs(start, stop enode.ID, maxCount int) (result []enode.ID) { 179 return bt.ndb.getPosBalanceIDs(start, stop, maxCount) 180} 181 182// SetDefaultFactors sets the default price factors applied to subsequently connected clients 183func (bt *balanceTracker) SetDefaultFactors(posFactors, negFactors PriceFactors) { 184 bt.lock.Lock() 185 bt.defaultPosFactors = posFactors 186 bt.defaultNegFactors = negFactors 187 bt.lock.Unlock() 188} 189 190// SetExpirationTCs sets positive and negative token expiration time constants. 191// Specified in seconds, 0 means infinite (no expiration). 192func (bt *balanceTracker) SetExpirationTCs(pos, neg uint64) { 193 bt.lock.Lock() 194 defer bt.lock.Unlock() 195 196 bt.posExpTC, bt.negExpTC = pos, neg 197 now := bt.clock.Now() 198 if pos > 0 { 199 bt.posExp.SetRate(now, 1/float64(pos*uint64(time.Second))) 200 } else { 201 bt.posExp.SetRate(now, 0) 202 } 203 if neg > 0 { 204 bt.negExp.SetRate(now, 1/float64(neg*uint64(time.Second))) 205 } else { 206 bt.negExp.SetRate(now, 0) 207 } 208} 209 210// GetExpirationTCs returns the current positive and negative token expiration 211// time constants 212func (bt *balanceTracker) GetExpirationTCs() (pos, neg uint64) { 213 bt.lock.Lock() 214 defer bt.lock.Unlock() 215 216 return bt.posExpTC, bt.negExpTC 217} 218 219// BalanceOperation allows atomic operations on the balance of a node regardless of whether 220// it is currently connected or not 221func (bt *balanceTracker) BalanceOperation(id enode.ID, connAddress string, cb func(AtomicBalanceOperator)) { 222 bt.ns.Operation(func() { 223 var nb *nodeBalance 224 if node := bt.ns.GetNode(id); node != nil { 225 nb, _ = bt.ns.GetField(node, bt.setup.balanceField).(*nodeBalance) 226 } 227 if nb == nil { 228 node := enode.SignNull(&enr.Record{}, id) 229 nb = bt.newNodeBalance(node, connAddress, false) 230 } 231 cb(nb) 232 }) 233} 234 235// newNodeBalance loads balances from the database and creates a nodeBalance instance 236// for the given node. It also sets the priorityFlag and adds balanceCallbackZero if 237// the node has a positive balance. 238// Note: this function should run inside a NodeStateMachine operation 239func (bt *balanceTracker) newNodeBalance(node *enode.Node, connAddress string, setFlags bool) *nodeBalance { 240 pb := bt.ndb.getOrNewBalance(node.ID().Bytes(), false) 241 nb := bt.ndb.getOrNewBalance([]byte(connAddress), true) 242 n := &nodeBalance{ 243 bt: bt, 244 node: node, 245 setFlags: setFlags, 246 connAddress: connAddress, 247 balance: balance{pos: pb, neg: nb, posExp: bt.posExp, negExp: bt.negExp}, 248 initTime: bt.clock.Now(), 249 lastUpdate: bt.clock.Now(), 250 } 251 for i := range n.callbackIndex { 252 n.callbackIndex[i] = -1 253 } 254 if setFlags && n.checkPriorityStatus() { 255 n.bt.ns.SetStateSub(n.node, n.bt.setup.priorityFlag, nodestate.Flags{}, 0) 256 } 257 return n 258} 259 260// storeBalance stores either a positive or a negative balance in the database 261func (bt *balanceTracker) storeBalance(id []byte, neg bool, value utils.ExpiredValue) { 262 if bt.canDropBalance(bt.clock.Now(), neg, value) { 263 bt.ndb.delBalance(id, neg) // balance is small enough, drop it directly. 264 } else { 265 bt.ndb.setBalance(id, neg, value) 266 } 267} 268 269// canDropBalance tells whether a positive or negative balance is below the threshold 270// and therefore can be dropped from the database 271func (bt *balanceTracker) canDropBalance(now mclock.AbsTime, neg bool, b utils.ExpiredValue) bool { 272 if neg { 273 return b.Value(bt.negExp.LogOffset(now)) <= negThreshold 274 } 275 return b.Value(bt.posExp.LogOffset(now)) <= posThreshold 276} 277 278// updateTotalBalance adjusts the total balance after executing given callback. 279func (bt *balanceTracker) updateTotalBalance(n *nodeBalance, callback func() bool) { 280 bt.lock.Lock() 281 defer bt.lock.Unlock() 282 283 n.lock.Lock() 284 defer n.lock.Unlock() 285 286 original, active := n.balance.pos, n.active 287 if !callback() { 288 return 289 } 290 if active { 291 bt.active.SubExp(original) 292 } else { 293 bt.inactive.SubExp(original) 294 } 295 if n.active { 296 bt.active.AddExp(n.balance.pos) 297 } else { 298 bt.inactive.AddExp(n.balance.pos) 299 } 300} 301