1// Copyright 2020 The go-ethereum Authors
2// This file is part of the go-ethereum library.
3//
4// The go-ethereum library is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Lesser General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// The go-ethereum library is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Lesser General Public License for more details.
13//
14// You should have received a copy of the GNU Lesser General Public License
15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16
17package server
18
19import (
20	"sync"
21	"time"
22
23	"github.com/ethereum/go-ethereum/common/mclock"
24	"github.com/ethereum/go-ethereum/ethdb"
25	"github.com/ethereum/go-ethereum/les/utils"
26	"github.com/ethereum/go-ethereum/p2p/enode"
27	"github.com/ethereum/go-ethereum/p2p/enr"
28	"github.com/ethereum/go-ethereum/p2p/nodestate"
29)
30
31const (
32	posThreshold             = 1000000         // minimum positive balance that is persisted in the database
33	negThreshold             = 1000000         // minimum negative balance that is persisted in the database
34	persistExpirationRefresh = time.Minute * 5 // refresh period of the token expiration persistence
35)
36
37// balanceTracker tracks positive and negative balances for connected nodes.
38// After clientField is set externally, a nodeBalance is created and previous
39// balance values are loaded from the database. Both balances are exponentially expired
40// values. Costs are deducted from the positive balance if present, otherwise added to
41// the negative balance. If the capacity is non-zero then a time cost is applied
42// continuously while individual request costs are applied immediately.
43// The two balances are translated into a single priority value that also depends
44// on the actual capacity.
45type balanceTracker struct {
46	setup          *serverSetup
47	clock          mclock.Clock
48	lock           sync.Mutex
49	ns             *nodestate.NodeStateMachine
50	ndb            *nodeDB
51	posExp, negExp utils.ValueExpirer
52
53	posExpTC, negExpTC                   uint64
54	defaultPosFactors, defaultNegFactors PriceFactors
55
56	active, inactive utils.ExpiredValue
57	balanceTimer     *utils.UpdateTimer
58	quit             chan struct{}
59}
60
61// newBalanceTracker creates a new balanceTracker
62func newBalanceTracker(ns *nodestate.NodeStateMachine, setup *serverSetup, db ethdb.KeyValueStore, clock mclock.Clock, posExp, negExp utils.ValueExpirer) *balanceTracker {
63	ndb := newNodeDB(db, clock)
64	bt := &balanceTracker{
65		ns:           ns,
66		setup:        setup,
67		ndb:          ndb,
68		clock:        clock,
69		posExp:       posExp,
70		negExp:       negExp,
71		balanceTimer: utils.NewUpdateTimer(clock, time.Second*10),
72		quit:         make(chan struct{}),
73	}
74	posOffset, negOffset := bt.ndb.getExpiration()
75	posExp.SetLogOffset(clock.Now(), posOffset)
76	negExp.SetLogOffset(clock.Now(), negOffset)
77
78	// Load all persisted balance entries of priority nodes,
79	// calculate the total number of issued service tokens.
80	bt.ndb.forEachBalance(false, func(id enode.ID, balance utils.ExpiredValue) bool {
81		bt.inactive.AddExp(balance)
82		return true
83	})
84
85	ns.SubscribeField(bt.setup.capacityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
86		n, _ := ns.GetField(node, bt.setup.balanceField).(*nodeBalance)
87		if n == nil {
88			return
89		}
90
91		ov, _ := oldValue.(uint64)
92		nv, _ := newValue.(uint64)
93		if ov == 0 && nv != 0 {
94			n.activate()
95		}
96		if nv != 0 {
97			n.setCapacity(nv)
98		}
99		if ov != 0 && nv == 0 {
100			n.deactivate()
101		}
102	})
103	ns.SubscribeField(bt.setup.clientField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
104		type peer interface {
105			FreeClientId() string
106		}
107		if newValue != nil {
108			n := bt.newNodeBalance(node, newValue.(peer).FreeClientId(), true)
109			bt.lock.Lock()
110			n.SetPriceFactors(bt.defaultPosFactors, bt.defaultNegFactors)
111			bt.lock.Unlock()
112			ns.SetFieldSub(node, bt.setup.balanceField, n)
113		} else {
114			ns.SetStateSub(node, nodestate.Flags{}, bt.setup.priorityFlag, 0)
115			if b, _ := ns.GetField(node, bt.setup.balanceField).(*nodeBalance); b != nil {
116				b.deactivate()
117			}
118			ns.SetFieldSub(node, bt.setup.balanceField, nil)
119		}
120	})
121
122	// The positive and negative balances of clients are stored in database
123	// and both of these decay exponentially over time. Delete them if the
124	// value is small enough.
125	bt.ndb.evictCallBack = bt.canDropBalance
126
127	go func() {
128		for {
129			select {
130			case <-clock.After(persistExpirationRefresh):
131				now := clock.Now()
132				bt.ndb.setExpiration(posExp.LogOffset(now), negExp.LogOffset(now))
133			case <-bt.quit:
134				return
135			}
136		}
137	}()
138	return bt
139}
140
141// Stop saves expiration offset and unsaved node balances and shuts balanceTracker down
142func (bt *balanceTracker) stop() {
143	now := bt.clock.Now()
144	bt.ndb.setExpiration(bt.posExp.LogOffset(now), bt.negExp.LogOffset(now))
145	close(bt.quit)
146	bt.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
147		if n, ok := bt.ns.GetField(node, bt.setup.balanceField).(*nodeBalance); ok {
148			n.lock.Lock()
149			n.storeBalance(true, true)
150			n.lock.Unlock()
151			bt.ns.SetField(node, bt.setup.balanceField, nil)
152		}
153	})
154	bt.ndb.close()
155}
156
157// TotalTokenAmount returns the current total amount of service tokens in existence
158func (bt *balanceTracker) TotalTokenAmount() uint64 {
159	bt.lock.Lock()
160	defer bt.lock.Unlock()
161
162	bt.balanceTimer.Update(func(_ time.Duration) bool {
163		bt.active = utils.ExpiredValue{}
164		bt.ns.ForEach(nodestate.Flags{}, nodestate.Flags{}, func(node *enode.Node, state nodestate.Flags) {
165			if n, ok := bt.ns.GetField(node, bt.setup.balanceField).(*nodeBalance); ok && n.active {
166				pos, _ := n.GetRawBalance()
167				bt.active.AddExp(pos)
168			}
169		})
170		return true
171	})
172	total := bt.active
173	total.AddExp(bt.inactive)
174	return total.Value(bt.posExp.LogOffset(bt.clock.Now()))
175}
176
177// GetPosBalanceIDs lists node IDs with an associated positive balance
178func (bt *balanceTracker) GetPosBalanceIDs(start, stop enode.ID, maxCount int) (result []enode.ID) {
179	return bt.ndb.getPosBalanceIDs(start, stop, maxCount)
180}
181
182// SetDefaultFactors sets the default price factors applied to subsequently connected clients
183func (bt *balanceTracker) SetDefaultFactors(posFactors, negFactors PriceFactors) {
184	bt.lock.Lock()
185	bt.defaultPosFactors = posFactors
186	bt.defaultNegFactors = negFactors
187	bt.lock.Unlock()
188}
189
190// SetExpirationTCs sets positive and negative token expiration time constants.
191// Specified in seconds, 0 means infinite (no expiration).
192func (bt *balanceTracker) SetExpirationTCs(pos, neg uint64) {
193	bt.lock.Lock()
194	defer bt.lock.Unlock()
195
196	bt.posExpTC, bt.negExpTC = pos, neg
197	now := bt.clock.Now()
198	if pos > 0 {
199		bt.posExp.SetRate(now, 1/float64(pos*uint64(time.Second)))
200	} else {
201		bt.posExp.SetRate(now, 0)
202	}
203	if neg > 0 {
204		bt.negExp.SetRate(now, 1/float64(neg*uint64(time.Second)))
205	} else {
206		bt.negExp.SetRate(now, 0)
207	}
208}
209
210// GetExpirationTCs returns the current positive and negative token expiration
211// time constants
212func (bt *balanceTracker) GetExpirationTCs() (pos, neg uint64) {
213	bt.lock.Lock()
214	defer bt.lock.Unlock()
215
216	return bt.posExpTC, bt.negExpTC
217}
218
219// BalanceOperation allows atomic operations on the balance of a node regardless of whether
220// it is currently connected or not
221func (bt *balanceTracker) BalanceOperation(id enode.ID, connAddress string, cb func(AtomicBalanceOperator)) {
222	bt.ns.Operation(func() {
223		var nb *nodeBalance
224		if node := bt.ns.GetNode(id); node != nil {
225			nb, _ = bt.ns.GetField(node, bt.setup.balanceField).(*nodeBalance)
226		}
227		if nb == nil {
228			node := enode.SignNull(&enr.Record{}, id)
229			nb = bt.newNodeBalance(node, connAddress, false)
230		}
231		cb(nb)
232	})
233}
234
235// newNodeBalance loads balances from the database and creates a nodeBalance instance
236// for the given node. It also sets the priorityFlag and adds balanceCallbackZero if
237// the node has a positive balance.
238// Note: this function should run inside a NodeStateMachine operation
239func (bt *balanceTracker) newNodeBalance(node *enode.Node, connAddress string, setFlags bool) *nodeBalance {
240	pb := bt.ndb.getOrNewBalance(node.ID().Bytes(), false)
241	nb := bt.ndb.getOrNewBalance([]byte(connAddress), true)
242	n := &nodeBalance{
243		bt:          bt,
244		node:        node,
245		setFlags:    setFlags,
246		connAddress: connAddress,
247		balance:     balance{pos: pb, neg: nb, posExp: bt.posExp, negExp: bt.negExp},
248		initTime:    bt.clock.Now(),
249		lastUpdate:  bt.clock.Now(),
250	}
251	for i := range n.callbackIndex {
252		n.callbackIndex[i] = -1
253	}
254	if setFlags && n.checkPriorityStatus() {
255		n.bt.ns.SetStateSub(n.node, n.bt.setup.priorityFlag, nodestate.Flags{}, 0)
256	}
257	return n
258}
259
260// storeBalance stores either a positive or a negative balance in the database
261func (bt *balanceTracker) storeBalance(id []byte, neg bool, value utils.ExpiredValue) {
262	if bt.canDropBalance(bt.clock.Now(), neg, value) {
263		bt.ndb.delBalance(id, neg) // balance is small enough, drop it directly.
264	} else {
265		bt.ndb.setBalance(id, neg, value)
266	}
267}
268
269// canDropBalance tells whether a positive or negative balance is below the threshold
270// and therefore can be dropped from the database
271func (bt *balanceTracker) canDropBalance(now mclock.AbsTime, neg bool, b utils.ExpiredValue) bool {
272	if neg {
273		return b.Value(bt.negExp.LogOffset(now)) <= negThreshold
274	}
275	return b.Value(bt.posExp.LogOffset(now)) <= posThreshold
276}
277
278// updateTotalBalance adjusts the total balance after executing given callback.
279func (bt *balanceTracker) updateTotalBalance(n *nodeBalance, callback func() bool) {
280	bt.lock.Lock()
281	defer bt.lock.Unlock()
282
283	n.lock.Lock()
284	defer n.lock.Unlock()
285
286	original, active := n.balance.pos, n.active
287	if !callback() {
288		return
289	}
290	if active {
291		bt.active.SubExp(original)
292	} else {
293		bt.inactive.SubExp(original)
294	}
295	if n.active {
296		bt.active.AddExp(n.balance.pos)
297	} else {
298		bt.inactive.AddExp(n.balance.pos)
299	}
300}
301