1// Copyright 2016 The go-ethereum Authors
2// This file is part of the go-ethereum library.
3//
4// The go-ethereum library is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Lesser General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// The go-ethereum library is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Lesser General Public License for more details.
13//
14// You should have received a copy of the GNU Lesser General Public License
15// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16
17// Package flowcontrol implements a client side flow control mechanism
18package flowcontrol
19
20import (
21	"fmt"
22	"math"
23	"sync"
24	"time"
25
26	"github.com/ethereum/go-ethereum/common/mclock"
27	"github.com/ethereum/go-ethereum/log"
28)
29
30const (
31	// fcTimeConst is the time constant applied for MinRecharge during linear
32	// buffer recharge period
33	fcTimeConst = time.Millisecond
34	// DecParamDelay is applied at server side when decreasing capacity in order to
35	// avoid a buffer underrun error due to requests sent by the client before
36	// receiving the capacity update announcement
37	DecParamDelay = time.Second * 2
38	// keepLogs is the duration of keeping logs; logging is not used if zero
39	keepLogs = 0
40)
41
42// ServerParams are the flow control parameters specified by a server for a client
43//
44// Note: a server can assign different amounts of capacity to each client by giving
45// different parameters to them.
46type ServerParams struct {
47	BufLimit, MinRecharge uint64
48}
49
50// scheduledUpdate represents a delayed flow control parameter update
51type scheduledUpdate struct {
52	time   mclock.AbsTime
53	params ServerParams
54}
55
56// ClientNode is the flow control system's representation of a client
57// (used in server mode only)
58type ClientNode struct {
59	params         ServerParams
60	bufValue       int64
61	lastTime       mclock.AbsTime
62	updateSchedule []scheduledUpdate
63	sumCost        uint64            // sum of req costs received from this client
64	accepted       map[uint64]uint64 // value = sumCost after accepting the given req
65	connected      bool
66	lock           sync.Mutex
67	cm             *ClientManager
68	log            *logger
69	cmNodeFields
70}
71
72// NewClientNode returns a new ClientNode
73func NewClientNode(cm *ClientManager, params ServerParams) *ClientNode {
74	node := &ClientNode{
75		cm:        cm,
76		params:    params,
77		bufValue:  int64(params.BufLimit),
78		lastTime:  cm.clock.Now(),
79		accepted:  make(map[uint64]uint64),
80		connected: true,
81	}
82	if keepLogs > 0 {
83		node.log = newLogger(keepLogs)
84	}
85	cm.connect(node)
86	return node
87}
88
89// Disconnect should be called when a client is disconnected
90func (node *ClientNode) Disconnect() {
91	node.lock.Lock()
92	defer node.lock.Unlock()
93
94	node.connected = false
95	node.cm.disconnect(node)
96}
97
98// BufferStatus returns the current buffer value and limit
99func (node *ClientNode) BufferStatus() (uint64, uint64) {
100	node.lock.Lock()
101	defer node.lock.Unlock()
102
103	if !node.connected {
104		return 0, 0
105	}
106	now := node.cm.clock.Now()
107	node.update(now)
108	node.cm.updateBuffer(node, 0, now)
109	bv := node.bufValue
110	if bv < 0 {
111		bv = 0
112	}
113	return uint64(bv), node.params.BufLimit
114}
115
116// OneTimeCost subtracts the given amount from the node's buffer.
117//
118// Note: this call can take the buffer into the negative region internally.
119// In this case zero buffer value is returned by exported calls and no requests
120// are accepted.
121func (node *ClientNode) OneTimeCost(cost uint64) {
122	node.lock.Lock()
123	defer node.lock.Unlock()
124
125	now := node.cm.clock.Now()
126	node.update(now)
127	node.bufValue -= int64(cost)
128	node.cm.updateBuffer(node, -int64(cost), now)
129}
130
131// Freeze notifies the client manager about a client freeze event in which case
132// the total capacity allowance is slightly reduced.
133func (node *ClientNode) Freeze() {
134	node.lock.Lock()
135	frozenCap := node.params.MinRecharge
136	node.lock.Unlock()
137	node.cm.reduceTotalCapacity(frozenCap)
138}
139
140// update recalculates the buffer value at a specified time while also performing
141// scheduled flow control parameter updates if necessary
142func (node *ClientNode) update(now mclock.AbsTime) {
143	for len(node.updateSchedule) > 0 && node.updateSchedule[0].time <= now {
144		node.recalcBV(node.updateSchedule[0].time)
145		node.updateParams(node.updateSchedule[0].params, now)
146		node.updateSchedule = node.updateSchedule[1:]
147	}
148	node.recalcBV(now)
149}
150
151// recalcBV recalculates the buffer value at a specified time
152func (node *ClientNode) recalcBV(now mclock.AbsTime) {
153	dt := uint64(now - node.lastTime)
154	if now < node.lastTime {
155		dt = 0
156	}
157	node.bufValue += int64(node.params.MinRecharge * dt / uint64(fcTimeConst))
158	if node.bufValue > int64(node.params.BufLimit) {
159		node.bufValue = int64(node.params.BufLimit)
160	}
161	if node.log != nil {
162		node.log.add(now, fmt.Sprintf("updated  bv=%d  MRR=%d  BufLimit=%d", node.bufValue, node.params.MinRecharge, node.params.BufLimit))
163	}
164	node.lastTime = now
165}
166
167// UpdateParams updates the flow control parameters of a client node
168func (node *ClientNode) UpdateParams(params ServerParams) {
169	node.lock.Lock()
170	defer node.lock.Unlock()
171
172	now := node.cm.clock.Now()
173	node.update(now)
174	if params.MinRecharge >= node.params.MinRecharge {
175		node.updateSchedule = nil
176		node.updateParams(params, now)
177	} else {
178		for i, s := range node.updateSchedule {
179			if params.MinRecharge >= s.params.MinRecharge {
180				s.params = params
181				node.updateSchedule = node.updateSchedule[:i+1]
182				return
183			}
184		}
185		node.updateSchedule = append(node.updateSchedule, scheduledUpdate{time: now + mclock.AbsTime(DecParamDelay), params: params})
186	}
187}
188
189// updateParams updates the flow control parameters of the node
190func (node *ClientNode) updateParams(params ServerParams, now mclock.AbsTime) {
191	diff := int64(params.BufLimit - node.params.BufLimit)
192	if diff > 0 {
193		node.bufValue += diff
194	} else if node.bufValue > int64(params.BufLimit) {
195		node.bufValue = int64(params.BufLimit)
196	}
197	node.cm.updateParams(node, params, now)
198}
199
200// AcceptRequest returns whether a new request can be accepted and the missing
201// buffer amount if it was rejected due to a buffer underrun. If accepted, maxCost
202// is deducted from the flow control buffer.
203func (node *ClientNode) AcceptRequest(reqID, index, maxCost uint64) (accepted bool, bufShort uint64, priority int64) {
204	node.lock.Lock()
205	defer node.lock.Unlock()
206
207	now := node.cm.clock.Now()
208	node.update(now)
209	if int64(maxCost) > node.bufValue {
210		if node.log != nil {
211			node.log.add(now, fmt.Sprintf("rejected  reqID=%d  bv=%d  maxCost=%d", reqID, node.bufValue, maxCost))
212			node.log.dump(now)
213		}
214		return false, maxCost - uint64(node.bufValue), 0
215	}
216	node.bufValue -= int64(maxCost)
217	node.sumCost += maxCost
218	if node.log != nil {
219		node.log.add(now, fmt.Sprintf("accepted  reqID=%d  bv=%d  maxCost=%d  sumCost=%d", reqID, node.bufValue, maxCost, node.sumCost))
220	}
221	node.accepted[index] = node.sumCost
222	return true, 0, node.cm.accepted(node, maxCost, now)
223}
224
225// RequestProcessed should be called when the request has been processed
226func (node *ClientNode) RequestProcessed(reqID, index, maxCost, realCost uint64) uint64 {
227	node.lock.Lock()
228	defer node.lock.Unlock()
229
230	now := node.cm.clock.Now()
231	node.update(now)
232	node.cm.processed(node, maxCost, realCost, now)
233	bv := node.bufValue + int64(node.sumCost-node.accepted[index])
234	if node.log != nil {
235		node.log.add(now, fmt.Sprintf("processed  reqID=%d  bv=%d  maxCost=%d  realCost=%d  sumCost=%d  oldSumCost=%d  reportedBV=%d", reqID, node.bufValue, maxCost, realCost, node.sumCost, node.accepted[index], bv))
236	}
237	delete(node.accepted, index)
238	if bv < 0 {
239		return 0
240	}
241	return uint64(bv)
242}
243
244// ServerNode is the flow control system's representation of a server
245// (used in client mode only)
246type ServerNode struct {
247	clock       mclock.Clock
248	bufEstimate uint64
249	bufRecharge bool
250	lastTime    mclock.AbsTime
251	params      ServerParams
252	sumCost     uint64            // sum of req costs sent to this server
253	pending     map[uint64]uint64 // value = sumCost after sending the given req
254	log         *logger
255	lock        sync.RWMutex
256}
257
258// NewServerNode returns a new ServerNode
259func NewServerNode(params ServerParams, clock mclock.Clock) *ServerNode {
260	node := &ServerNode{
261		clock:       clock,
262		bufEstimate: params.BufLimit,
263		bufRecharge: false,
264		lastTime:    clock.Now(),
265		params:      params,
266		pending:     make(map[uint64]uint64),
267	}
268	if keepLogs > 0 {
269		node.log = newLogger(keepLogs)
270	}
271	return node
272}
273
274// UpdateParams updates the flow control parameters of the node
275func (node *ServerNode) UpdateParams(params ServerParams) {
276	node.lock.Lock()
277	defer node.lock.Unlock()
278
279	node.recalcBLE(mclock.Now())
280	if params.BufLimit > node.params.BufLimit {
281		node.bufEstimate += params.BufLimit - node.params.BufLimit
282	} else {
283		if node.bufEstimate > params.BufLimit {
284			node.bufEstimate = params.BufLimit
285		}
286	}
287	node.params = params
288}
289
290// recalcBLE recalculates the lowest estimate for the client's buffer value at
291// the given server at the specified time
292func (node *ServerNode) recalcBLE(now mclock.AbsTime) {
293	if now < node.lastTime {
294		return
295	}
296	if node.bufRecharge {
297		dt := uint64(now - node.lastTime)
298		node.bufEstimate += node.params.MinRecharge * dt / uint64(fcTimeConst)
299		if node.bufEstimate >= node.params.BufLimit {
300			node.bufEstimate = node.params.BufLimit
301			node.bufRecharge = false
302		}
303	}
304	node.lastTime = now
305	if node.log != nil {
306		node.log.add(now, fmt.Sprintf("updated  bufEst=%d  MRR=%d  BufLimit=%d", node.bufEstimate, node.params.MinRecharge, node.params.BufLimit))
307	}
308}
309
310// safetyMargin is added to the flow control waiting time when estimated buffer value is low
311const safetyMargin = time.Millisecond
312
313// CanSend returns the minimum waiting time required before sending a request
314// with the given maximum estimated cost. Second return value is the relative
315// estimated buffer level after sending the request (divided by BufLimit).
316func (node *ServerNode) CanSend(maxCost uint64) (time.Duration, float64) {
317	node.lock.RLock()
318	defer node.lock.RUnlock()
319
320	if node.params.BufLimit == 0 {
321		return time.Duration(math.MaxInt64), 0
322	}
323	now := node.clock.Now()
324	node.recalcBLE(now)
325	maxCost += uint64(safetyMargin) * node.params.MinRecharge / uint64(fcTimeConst)
326	if maxCost > node.params.BufLimit {
327		maxCost = node.params.BufLimit
328	}
329	if node.bufEstimate >= maxCost {
330		relBuf := float64(node.bufEstimate-maxCost) / float64(node.params.BufLimit)
331		if node.log != nil {
332			node.log.add(now, fmt.Sprintf("canSend  bufEst=%d  maxCost=%d  true  relBuf=%f", node.bufEstimate, maxCost, relBuf))
333		}
334		return 0, relBuf
335	}
336	timeLeft := time.Duration((maxCost - node.bufEstimate) * uint64(fcTimeConst) / node.params.MinRecharge)
337	if node.log != nil {
338		node.log.add(now, fmt.Sprintf("canSend  bufEst=%d  maxCost=%d  false  timeLeft=%v", node.bufEstimate, maxCost, timeLeft))
339	}
340	return timeLeft, 0
341}
342
343// QueuedRequest should be called when the request has been assigned to the given
344// server node, before putting it in the send queue. It is mandatory that requests
345// are sent in the same order as the QueuedRequest calls are made.
346func (node *ServerNode) QueuedRequest(reqID, maxCost uint64) {
347	node.lock.Lock()
348	defer node.lock.Unlock()
349
350	now := node.clock.Now()
351	node.recalcBLE(now)
352	// Note: we do not know when requests actually arrive to the server so bufRecharge
353	// is not turned on here if buffer was full; in this case it is going to be turned
354	// on by the first reply's bufValue feedback
355	if node.bufEstimate >= maxCost {
356		node.bufEstimate -= maxCost
357	} else {
358		log.Error("Queued request with insufficient buffer estimate")
359		node.bufEstimate = 0
360	}
361	node.sumCost += maxCost
362	node.pending[reqID] = node.sumCost
363	if node.log != nil {
364		node.log.add(now, fmt.Sprintf("queued  reqID=%d  bufEst=%d  maxCost=%d  sumCost=%d", reqID, node.bufEstimate, maxCost, node.sumCost))
365	}
366}
367
368// ReceivedReply adjusts estimated buffer value according to the value included in
369// the latest request reply.
370func (node *ServerNode) ReceivedReply(reqID, bv uint64) {
371	node.lock.Lock()
372	defer node.lock.Unlock()
373
374	now := node.clock.Now()
375	node.recalcBLE(now)
376	if bv > node.params.BufLimit {
377		bv = node.params.BufLimit
378	}
379	sc, ok := node.pending[reqID]
380	if !ok {
381		return
382	}
383	delete(node.pending, reqID)
384	cc := node.sumCost - sc
385	newEstimate := uint64(0)
386	if bv > cc {
387		newEstimate = bv - cc
388	}
389	if newEstimate > node.bufEstimate {
390		// Note: we never reduce the buffer estimate based on the reported value because
391		// this can only happen because of the delayed delivery of the latest reply.
392		// The lowest estimate based on the previous reply can still be considered valid.
393		node.bufEstimate = newEstimate
394	}
395
396	node.bufRecharge = node.bufEstimate < node.params.BufLimit
397	node.lastTime = now
398	if node.log != nil {
399		node.log.add(now, fmt.Sprintf("received  reqID=%d  bufEst=%d  reportedBv=%d  sumCost=%d  oldSumCost=%d", reqID, node.bufEstimate, bv, node.sumCost, sc))
400	}
401}
402
403// ResumeFreeze cleans all pending requests and sets the buffer estimate to the
404// reported value after resuming from a frozen state
405func (node *ServerNode) ResumeFreeze(bv uint64) {
406	node.lock.Lock()
407	defer node.lock.Unlock()
408
409	for reqID := range node.pending {
410		delete(node.pending, reqID)
411	}
412	now := node.clock.Now()
413	node.recalcBLE(now)
414	if bv > node.params.BufLimit {
415		bv = node.params.BufLimit
416	}
417	node.bufEstimate = bv
418	node.bufRecharge = node.bufEstimate < node.params.BufLimit
419	node.lastTime = now
420	if node.log != nil {
421		node.log.add(now, fmt.Sprintf("unfreeze  bv=%d  sumCost=%d", bv, node.sumCost))
422	}
423}
424
425// DumpLogs dumps the event log if logging is used
426func (node *ServerNode) DumpLogs() {
427	node.lock.Lock()
428	defer node.lock.Unlock()
429
430	if node.log != nil {
431		node.log.dump(node.clock.Now())
432	}
433}
434