1// Copyright 2019 The go-ethereum Authors
2// This file is part of the go-ethereum library.
3//
4// The go-ethereum library is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Lesser General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// The go-ethereum library is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Lesser General Public License for more details.
13//
14// You should have received a copy of the GNU Lesser General Public License
15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16
17package server
18
19import (
20	"errors"
21	"sync"
22	"time"
23
24	"github.com/ethereum/go-ethereum/common/mclock"
25	"github.com/ethereum/go-ethereum/ethdb"
26	"github.com/ethereum/go-ethereum/les/utils"
27	"github.com/ethereum/go-ethereum/les/vflux"
28	"github.com/ethereum/go-ethereum/log"
29	"github.com/ethereum/go-ethereum/p2p/enode"
30	"github.com/ethereum/go-ethereum/p2p/nodestate"
31	"github.com/ethereum/go-ethereum/rlp"
32)
33
34var (
35	ErrNotConnected    = errors.New("client not connected")
36	ErrNoPriority      = errors.New("priority too low to raise capacity")
37	ErrCantFindMaximum = errors.New("Unable to find maximum allowed capacity")
38)
39
40// ClientPool implements a client database that assigns a priority to each client
41// based on a positive and negative balance. Positive balance is externally assigned
42// to prioritized clients and is decreased with connection time and processed
43// requests (unless the price factors are zero). If the positive balance is zero
44// then negative balance is accumulated.
45//
46// Balance tracking and priority calculation for connected clients is done by
47// balanceTracker. PriorityQueue ensures that clients with the lowest positive or
48// highest negative balance get evicted when the total capacity allowance is full
49// and new clients with a better balance want to connect.
50//
51// Already connected nodes receive a small bias in their favor in order to avoid
52// accepting and instantly kicking out clients. In theory, we try to ensure that
53// each client can have several minutes of connection time.
54//
55// Balances of disconnected clients are stored in nodeDB including positive balance
56// and negative banalce. Boeth positive balance and negative balance will decrease
57// exponentially. If the balance is low enough, then the record will be dropped.
58type ClientPool struct {
59	*priorityPool
60	*balanceTracker
61
62	setup  *serverSetup
63	clock  mclock.Clock
64	closed bool
65	ns     *nodestate.NodeStateMachine
66	synced func() bool
67
68	lock          sync.RWMutex
69	connectedBias time.Duration
70
71	minCap     uint64      // the minimal capacity value allowed for any client
72	capReqNode *enode.Node // node that is requesting capacity change; only used inside NSM operation
73}
74
75// clientPeer represents a peer in the client pool. None of the callbacks should block.
76type clientPeer interface {
77	Node() *enode.Node
78	FreeClientId() string                         // unique id for non-priority clients (typically a prefix of the network address)
79	InactiveAllowance() time.Duration             // disconnection timeout for inactive non-priority peers
80	UpdateCapacity(newCap uint64, requested bool) // signals a capacity update (requested is true if it is a result of a SetCapacity call on the given peer
81	Disconnect()                                  // initiates disconnection (Unregister should always be called)
82}
83
84// NewClientPool creates a new client pool
85func NewClientPool(balanceDb ethdb.KeyValueStore, minCap uint64, connectedBias time.Duration, clock mclock.Clock, synced func() bool) *ClientPool {
86	setup := newServerSetup()
87	ns := nodestate.NewNodeStateMachine(nil, nil, clock, setup.setup)
88	cp := &ClientPool{
89		priorityPool:   newPriorityPool(ns, setup, clock, minCap, connectedBias, 4, 100),
90		balanceTracker: newBalanceTracker(ns, setup, balanceDb, clock, &utils.Expirer{}, &utils.Expirer{}),
91		setup:          setup,
92		ns:             ns,
93		clock:          clock,
94		minCap:         minCap,
95		connectedBias:  connectedBias,
96		synced:         synced,
97	}
98
99	ns.SubscribeState(nodestate.MergeFlags(setup.activeFlag, setup.inactiveFlag, setup.priorityFlag), func(node *enode.Node, oldState, newState nodestate.Flags) {
100		if newState.Equals(setup.inactiveFlag) {
101			// set timeout for non-priority inactive client
102			var timeout time.Duration
103			if c, ok := ns.GetField(node, setup.clientField).(clientPeer); ok {
104				timeout = c.InactiveAllowance()
105			}
106			ns.AddTimeout(node, setup.inactiveFlag, timeout)
107		}
108		if oldState.Equals(setup.inactiveFlag) && newState.Equals(setup.inactiveFlag.Or(setup.priorityFlag)) {
109			ns.SetStateSub(node, setup.inactiveFlag, nodestate.Flags{}, 0) // priority gained; remove timeout
110		}
111		if newState.Equals(setup.activeFlag) {
112			// active with no priority; limit capacity to minCap
113			cap, _ := ns.GetField(node, setup.capacityField).(uint64)
114			if cap > minCap {
115				cp.requestCapacity(node, minCap, minCap, 0)
116			}
117		}
118		if newState.Equals(nodestate.Flags{}) {
119			if c, ok := ns.GetField(node, setup.clientField).(clientPeer); ok {
120				c.Disconnect()
121			}
122		}
123	})
124
125	ns.SubscribeField(setup.capacityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
126		if c, ok := ns.GetField(node, setup.clientField).(clientPeer); ok {
127			newCap, _ := newValue.(uint64)
128			c.UpdateCapacity(newCap, node == cp.capReqNode)
129		}
130	})
131
132	// add metrics
133	cp.ns.SubscribeState(nodestate.MergeFlags(cp.setup.activeFlag, cp.setup.inactiveFlag), func(node *enode.Node, oldState, newState nodestate.Flags) {
134		if oldState.IsEmpty() && !newState.IsEmpty() {
135			clientConnectedMeter.Mark(1)
136		}
137		if !oldState.IsEmpty() && newState.IsEmpty() {
138			clientDisconnectedMeter.Mark(1)
139		}
140		if oldState.HasNone(cp.setup.activeFlag) && oldState.HasAll(cp.setup.activeFlag) {
141			clientActivatedMeter.Mark(1)
142		}
143		if oldState.HasAll(cp.setup.activeFlag) && oldState.HasNone(cp.setup.activeFlag) {
144			clientDeactivatedMeter.Mark(1)
145		}
146		activeCount, activeCap := cp.Active()
147		totalActiveCountGauge.Update(int64(activeCount))
148		totalActiveCapacityGauge.Update(int64(activeCap))
149		totalInactiveCountGauge.Update(int64(cp.Inactive()))
150	})
151	return cp
152}
153
154// Start starts the client pool. Should be called before Register/Unregister.
155func (cp *ClientPool) Start() {
156	cp.ns.Start()
157}
158
159// Stop shuts the client pool down. The clientPeer interface callbacks will not be called
160// after Stop. Register calls will return nil.
161func (cp *ClientPool) Stop() {
162	cp.balanceTracker.stop()
163	cp.ns.Stop()
164}
165
166// Register registers the peer into the client pool. If the peer has insufficient
167// priority and remains inactive for longer than the allowed timeout then it will be
168// disconnected by calling the Disconnect function of the clientPeer interface.
169func (cp *ClientPool) Register(peer clientPeer) ConnectedBalance {
170	cp.ns.SetField(peer.Node(), cp.setup.clientField, peerWrapper{peer})
171	balance, _ := cp.ns.GetField(peer.Node(), cp.setup.balanceField).(*nodeBalance)
172	return balance
173}
174
175// Unregister removes the peer from the client pool
176func (cp *ClientPool) Unregister(peer clientPeer) {
177	cp.ns.SetField(peer.Node(), cp.setup.clientField, nil)
178}
179
180// setConnectedBias sets the connection bias, which is applied to already connected clients
181// So that already connected client won't be kicked out very soon and we can ensure all
182// connected clients can have enough time to request or sync some data.
183func (cp *ClientPool) SetConnectedBias(bias time.Duration) {
184	cp.lock.Lock()
185	cp.connectedBias = bias
186	cp.setActiveBias(bias)
187	cp.lock.Unlock()
188}
189
190// SetCapacity sets the assigned capacity of a connected client
191func (cp *ClientPool) SetCapacity(node *enode.Node, reqCap uint64, bias time.Duration, requested bool) (capacity uint64, err error) {
192	cp.lock.RLock()
193	if cp.connectedBias > bias {
194		bias = cp.connectedBias
195	}
196	cp.lock.RUnlock()
197
198	cp.ns.Operation(func() {
199		balance, _ := cp.ns.GetField(node, cp.setup.balanceField).(*nodeBalance)
200		if balance == nil {
201			err = ErrNotConnected
202			return
203		}
204		capacity, _ = cp.ns.GetField(node, cp.setup.capacityField).(uint64)
205		if capacity == 0 {
206			// if the client is inactive then it has insufficient priority for the minimal capacity
207			// (will be activated automatically with minCap when possible)
208			return
209		}
210		if reqCap < cp.minCap {
211			// can't request less than minCap; switching between 0 (inactive state) and minCap is
212			// performed by the server automatically as soon as necessary/possible
213			reqCap = cp.minCap
214		}
215		if reqCap > cp.minCap && cp.ns.GetState(node).HasNone(cp.setup.priorityFlag) {
216			err = ErrNoPriority
217			return
218		}
219		if reqCap == capacity {
220			return
221		}
222		if requested {
223			// mark the requested node so that the UpdateCapacity callback can signal
224			// whether the update is the direct result of a SetCapacity call on the given node
225			cp.capReqNode = node
226			defer func() {
227				cp.capReqNode = nil
228			}()
229		}
230
231		var minTarget, maxTarget uint64
232		if reqCap > capacity {
233			// Estimate maximum available capacity at the current priority level and request
234			// the estimated amount.
235			// Note: requestCapacity could find the highest available capacity between the
236			// current and the requested capacity but it could cost a lot of iterations with
237			// fine step adjustment if the requested capacity is very high. By doing a quick
238			// estimation of the maximum available capacity based on the capacity curve we
239			// can limit the number of required iterations.
240			curve := cp.getCapacityCurve().exclude(node.ID())
241			maxTarget = curve.maxCapacity(func(capacity uint64) int64 {
242				return balance.estimatePriority(capacity, 0, 0, bias, false)
243			})
244			if maxTarget < reqCap {
245				return
246			}
247			maxTarget = reqCap
248
249			// Specify a narrow target range that allows a limited number of fine step
250			// iterations
251			minTarget = maxTarget - maxTarget/20
252			if minTarget < capacity {
253				minTarget = capacity
254			}
255		} else {
256			minTarget, maxTarget = reqCap, reqCap
257		}
258		if newCap := cp.requestCapacity(node, minTarget, maxTarget, bias); newCap >= minTarget && newCap <= maxTarget {
259			capacity = newCap
260			return
261		}
262		// we should be able to find the maximum allowed capacity in a few iterations
263		log.Error("Unable to find maximum allowed capacity")
264		err = ErrCantFindMaximum
265	})
266	return
267}
268
269// serveCapQuery serves a vflux capacity query. It receives multiple token amount values
270// and a bias time value. For each given token amount it calculates the maximum achievable
271// capacity in case the amount is added to the balance.
272func (cp *ClientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []byte {
273	var req vflux.CapacityQueryReq
274	if rlp.DecodeBytes(data, &req) != nil {
275		return nil
276	}
277	if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen {
278		return nil
279	}
280	result := make(vflux.CapacityQueryReply, len(req.AddTokens))
281	if !cp.synced() {
282		capacityQueryZeroMeter.Mark(1)
283		reply, _ := rlp.EncodeToBytes(&result)
284		return reply
285	}
286
287	bias := time.Second * time.Duration(req.Bias)
288	cp.lock.RLock()
289	if cp.connectedBias > bias {
290		bias = cp.connectedBias
291	}
292	cp.lock.RUnlock()
293
294	// use capacityCurve to answer request for multiple newly bought token amounts
295	curve := cp.getCapacityCurve().exclude(id)
296	cp.BalanceOperation(id, freeID, func(balance AtomicBalanceOperator) {
297		pb, _ := balance.GetBalance()
298		for i, addTokens := range req.AddTokens {
299			add := addTokens.Int64()
300			result[i] = curve.maxCapacity(func(capacity uint64) int64 {
301				return balance.estimatePriority(capacity, add, 0, bias, false) / int64(capacity)
302			})
303			if add <= 0 && uint64(-add) >= pb && result[i] > cp.minCap {
304				result[i] = cp.minCap
305			}
306			if result[i] < cp.minCap {
307				result[i] = 0
308			}
309		}
310	})
311	// add first result to metrics (don't care about priority client multi-queries yet)
312	if result[0] == 0 {
313		capacityQueryZeroMeter.Mark(1)
314	} else {
315		capacityQueryNonZeroMeter.Mark(1)
316	}
317	reply, _ := rlp.EncodeToBytes(&result)
318	return reply
319}
320
321// Handle implements Service
322func (cp *ClientPool) Handle(id enode.ID, address string, name string, data []byte) []byte {
323	switch name {
324	case vflux.CapacityQueryName:
325		return cp.serveCapQuery(id, address, data)
326	default:
327		return nil
328	}
329}
330