1// SPDX-License-Identifier: ISC
2// Copyright (c) 2014-2020 Bitmark Inc.
3// Use of this source code is governed by an ISC
4// license that can be found in the LICENSE file.
5
6package peer
7
8import (
9	"bytes"
10	"container/list"
11	"fmt"
12	"math/rand"
13	"strings"
14	"sync"
15	"time"
16
17	"github.com/bitmark-inc/bitmarkd/block"
18	"github.com/bitmark-inc/bitmarkd/blockheader"
19	"github.com/bitmark-inc/bitmarkd/fault"
20	"github.com/bitmark-inc/bitmarkd/genesis"
21	"github.com/bitmark-inc/bitmarkd/messagebus"
22	"github.com/bitmark-inc/bitmarkd/mode"
23	"github.com/bitmark-inc/bitmarkd/peer/upstream"
24	"github.com/bitmark-inc/bitmarkd/peer/voting"
25	"github.com/bitmark-inc/bitmarkd/util"
26	"github.com/bitmark-inc/bitmarkd/zmqutil"
27	"github.com/bitmark-inc/logger"
28)
29
30// various timeouts
31const (
32	// pause to limit bandwidth
33	cycleInterval = 15 * time.Second
34
35	// time out for connections
36	connectorTimeout = 60 * time.Second
37
38	// number of cycles to be 1 block out of sync before resync
39	samplingLimit = 6
40
41	// number of blocks to fetch in one set
42	fetchBlocksPerCycle = 200
43
44	// fail to fork if height difference is greater than this
45	forkProtection = 60
46
47	// do not proceed unless this many clients are connected
48	minimumClients = 5
49
50	// total number of dynamic clients
51	maximumDynamicClients = 25
52
53	// client should exist at least 1 response with in this number
54	activeTime = 60 * time.Second
55
56	// fast sync option to fetch block
57	fastSyncFetchBlocksPerCycle = 2000
58	fastSyncSkipPerBlocks       = 100
59	fastSyncPivotBlocks         = 1000
60)
61
62type connector struct {
63	sync.RWMutex
64
65	log        *logger.L
66	preferIPv6 bool
67
68	staticClients []upstream.Upstream
69
70	dynamicClients list.List
71
72	state connectorState
73
74	theClient        upstream.Upstream // client used for fetching blocks
75	startBlockNumber uint64            // block number where local chain forks
76	height           uint64            // block number on best node
77	samples          int               // counter to detect missed block broadcast
78	votes            voting.Voting
79
80	fastSyncEnabled bool   // fast sync mode enabled?
81	blocksPerCycle  int    // number of blocks to fetch per cycle
82	pivotPoint      uint64 // block number to stop fast syncing
83}
84
85// initialise the connector
86func (conn *connector) initialise(
87	privateKey []byte,
88	publicKey []byte,
89	connect []Connection,
90	dynamicEnabled bool,
91	preferIPv6 bool,
92	fastSync bool,
93) error {
94
95	log := logger.New("connector")
96	conn.log = log
97
98	conn.preferIPv6 = preferIPv6
99
100	conn.fastSyncEnabled = fastSync
101
102	log.Info("initialising…")
103
104	// allocate all sockets
105	staticCount := len(connect) // can be zero
106	if 0 == staticCount && !dynamicEnabled {
107		log.Error("zero static connections and dynamic is disabled")
108		return fault.NoConnectionsAvailable
109	}
110	conn.staticClients = make([]upstream.Upstream, staticCount)
111
112	// initially connect all static sockets
113	wg := sync.WaitGroup{}
114	errCh := make(chan error, len(connect))
115
116	conn.log.Debugf("static connection count: %d", len(connect))
117
118	for i, c := range connect {
119		wg.Add(1)
120
121		// start new goroutine for each connection
122		go func(conn *connector, c Connection, i int, wg *sync.WaitGroup, ch chan error) {
123
124			// error function call
125			errF := func(wg *sync.WaitGroup, ch chan error, e error) {
126				ch <- e
127				wg.Done()
128			}
129
130			// for canonicaling the error
131			canonicalErrF := func(c Connection, e error) error {
132				return fmt.Errorf("client: %q error: %s", c.Address, e)
133			}
134
135			address, err := util.NewConnection(c.Address)
136			if nil != err {
137				log.Errorf("client[%d]=address: %q  error: %s", i, c.Address, err)
138				errF(wg, ch, canonicalErrF(c, err))
139				return
140			}
141			serverPublicKey, err := zmqutil.ReadPublicKey(c.PublicKey)
142			if nil != err {
143				log.Errorf("client[%d]=public: %q  error: %s", i, c.PublicKey, err)
144				errF(wg, ch, canonicalErrF(c, err))
145				return
146			}
147
148			// prevent connection to self
149			if bytes.Equal(publicKey, serverPublicKey) {
150				err := fault.ConnectingToSelfForbidden
151				log.Errorf("client[%d]=public: %q  error: %s", i, c.PublicKey, err)
152				errF(wg, ch, canonicalErrF(c, err))
153				return
154			}
155
156			client, err := upstream.New(privateKey, publicKey, connectorTimeout)
157			if nil != err {
158				log.Errorf("client[%d]=%q  error: %s", i, address, err)
159				errF(wg, ch, canonicalErrF(c, err))
160				return
161			}
162
163			conn.Lock()
164			conn.staticClients[i] = client
165			globalData.connectorClients = append(globalData.connectorClients, client)
166			conn.Unlock()
167
168			err = client.Connect(address, serverPublicKey)
169			if nil != err {
170				log.Errorf("connect[%d]=%q  error: %s", i, address, err)
171				errF(wg, ch, canonicalErrF(c, err))
172				return
173			}
174			log.Infof("public key: %x  at: %q", serverPublicKey, c.Address)
175			wg.Done()
176
177		}(conn, c, i, &wg, errCh)
178	}
179
180	conn.log.Debug("waiting for all static connections...")
181	wg.Wait()
182
183	// drop error channel for getting all errors
184	errs := make([]error, 0)
185	for len(errCh) > 0 {
186		errs = append(errs, <-errCh)
187	}
188
189	// error code for goto fail
190	err := error(nil)
191
192	if len(errs) == 1 {
193		err = errs[0]
194		goto fail
195	} else if len(errs) > 1 {
196		err = compositeError(errs)
197		goto fail
198	}
199
200	// just create sockets for dynamic clients
201	for i := 0; i < maximumDynamicClients; i++ {
202		client, e := upstream.New(privateKey, publicKey, connectorTimeout)
203		if nil != err {
204			log.Errorf("client[%d]  error: %s", i, e)
205			err = e
206			goto fail
207		}
208
209		// create list of all dynamic clients
210		conn.dynamicClients.PushBack(client)
211
212		globalData.connectorClients = append(globalData.connectorClients, client)
213	}
214
215	conn.votes = voting.NewVoting()
216
217	// start state machine
218	conn.nextState(cStateConnecting)
219
220	return nil
221
222	// error handling
223fail:
224	conn.destroy()
225
226	return err
227}
228
229// combine multi error into one
230func compositeError(errors []error) error {
231	if nil == errors || 0 == len(errors) {
232		return nil
233	}
234	var ce strings.Builder
235	ce.WriteString("composite error: [")
236	len := len(errors)
237	for i, e := range errors {
238		ce.WriteString(e.Error())
239		if i < len-1 {
240			ce.WriteString(", ")
241		}
242	}
243	ce.WriteString("]")
244	return fmt.Errorf(ce.String())
245}
246
247func (conn *connector) allClients(
248	f func(client upstream.Upstream, e *list.Element),
249) {
250	for _, client := range conn.staticClients {
251		if client != nil {
252			f(client, nil)
253		}
254	}
255	for e := conn.dynamicClients.Front(); nil != e; e = e.Next() {
256		if client := e.Value.(upstream.Upstream); client != nil {
257			f(client, e)
258		}
259	}
260}
261
262func (conn *connector) searchClients(
263	f func(client upstream.Upstream, e *list.Element) bool,
264) {
265	for _, client := range conn.staticClients {
266		if f(client, nil) {
267			return
268		}
269	}
270	for e := conn.dynamicClients.Front(); nil != e; e = e.Next() {
271		if f(e.Value.(upstream.Upstream), e) {
272			return
273		}
274	}
275}
276
277func (conn *connector) destroy() {
278	conn.allClients(func(client upstream.Upstream, e *list.Element) {
279		client.Destroy()
280	})
281}
282
283// various RPC calls to upstream connections
284func (conn *connector) Run(args interface{}, shutdown <-chan struct{}) {
285	log := conn.log
286
287	log.Info("starting…")
288
289	queue := messagebus.Bus.Connector.Chan()
290
291	timer := time.After(cycleInterval)
292
293loop:
294	for {
295		// wait for shutdown
296		log.Debug("waiting…")
297
298		select {
299		case <-shutdown:
300			break loop
301		case <-timer: // timer has priority over queue
302			timer = time.After(cycleInterval)
303			conn.process()
304		case item := <-queue:
305			c, _ := util.PackedConnection(item.Parameters[1]).Unpack()
306			conn.log.Debugf(
307				"received control: %s  public key: %x  connect: %x %q",
308				item.Command,
309				item.Parameters[0],
310				item.Parameters[1],
311				c,
312			)
313
314			switch item.Command {
315			case "@D": // internal command: delete a peer
316				conn.releaseServerKey(item.Parameters[0])
317				conn.log.Infof(
318					"connector receive server public key: %x",
319					item.Parameters[0],
320				)
321			default:
322				err := conn.connectUpstream(
323					item.Command,
324					item.Parameters[0],
325					item.Parameters[1],
326				)
327				if nil != err {
328					conn.log.Warnf("connect upstream error: %s", err)
329				}
330			}
331		}
332	}
333	log.Info("shutting down…")
334	conn.destroy()
335	log.Info("stopped")
336}
337
338// process the connect and return response
339func (conn *connector) process() {
340	// run the machine until it pauses
341	for conn.runStateMachine() {
342	}
343}
344
345// run state machine
346// return:
347//   true  if want more cycles
348//   false to pase for I/O
349func (conn *connector) runStateMachine() bool {
350	log := conn.log
351
352	log.Infof("current state: %s", conn.state)
353
354	continueLooping := true
355
356	switch conn.state {
357	case cStateConnecting:
358		mode.Set(mode.Resynchronise)
359		globalData.clientCount = conn.getConnectedClientCount()
360		log.Infof("connections: %d", globalData.clientCount)
361
362		if isConnectionEnough(globalData.clientCount) {
363			conn.nextState(cStateHighestBlock)
364		} else {
365			log.Warnf("connections: %d below minimum client count: %d", globalData.clientCount, minimumClients)
366			messagebus.Bus.Announce.Send("reconnect")
367		}
368		continueLooping = false
369
370	case cStateHighestBlock:
371		if conn.updateHeightAndClient() {
372			log.Infof("highest block number: %d  client: %s", conn.height, conn.theClient.Name())
373			if conn.hasBetterChain(blockheader.Height()) {
374				log.Infof("new chain from %s, height %d, digest %s", conn.theClient.Name(), conn.height, conn.theClient.CachedRemoteDigestOfLocalHeight().String())
375				log.Info("enter fork detect state")
376				conn.nextState(cStateForkDetect)
377			} else if conn.isSameChain() {
378				log.Info("remote same chain")
379				conn.nextState(cStateRebuild)
380			} else {
381				log.Info("remote chain invalid, stop looping for now")
382				continueLooping = false
383			}
384		} else {
385			log.Warn("highest block: connection lost")
386			conn.nextState(cStateConnecting)
387			continueLooping = false
388		}
389
390	case cStateForkDetect:
391		height := blockheader.Height()
392		if !conn.hasBetterChain(height) {
393			log.Info("remote without better chain, enter state rebuild")
394			conn.nextState(cStateRebuild)
395		} else {
396			// determine pivot point to stop fast sync
397			if conn.height > fastSyncPivotBlocks {
398				conn.pivotPoint = conn.height - fastSyncPivotBlocks
399			} else {
400				conn.pivotPoint = 0
401			}
402
403			log.Infof("Pivot point for fast sync: %d", conn.pivotPoint)
404
405			// first block number
406			conn.startBlockNumber = genesis.BlockNumber + 1
407			conn.nextState(cStateFetchBlocks) // assume success
408			log.Infof("local block number: %d", height)
409
410			blockheader.ClearCache()
411			// check digests of descending blocks (to detect a fork)
412		check_digests:
413			for h := height; h >= genesis.BlockNumber; h -= 1 {
414				digest, err := blockheader.DigestForBlock(h)
415				if nil != err {
416					log.Infof("block number: %d  local digest error: %s", h, err)
417					conn.nextState(cStateHighestBlock) // retry
418					break check_digests
419				}
420				d, err := conn.theClient.RemoteDigestOfHeight(h)
421				if nil != err {
422					log.Infof("block number: %d  fetch digest error: %s", h, err)
423					conn.nextState(cStateHighestBlock) // retry
424					break check_digests
425				} else if d == digest {
426					if height-h >= forkProtection {
427						log.Errorf("fork protection at: %d - %d >= %d", height, h, forkProtection)
428						conn.nextState(cStateHighestBlock)
429						break check_digests
430					}
431
432					conn.startBlockNumber = h + 1
433					log.Infof("fork from block number: %d", conn.startBlockNumber)
434
435					// remove old blocks
436					err := block.DeleteDownToBlock(conn.startBlockNumber)
437					if nil != err {
438						log.Errorf("delete down to block number: %d  error: %s", conn.startBlockNumber, err)
439						conn.nextState(cStateHighestBlock) // retry
440					}
441					break check_digests
442				}
443			}
444		}
445
446	case cStateFetchBlocks:
447		continueLooping = false
448		var packedBlock []byte
449		var packedNextBlock []byte
450
451		// Check fast sync state on each loop
452		if conn.fastSyncEnabled && conn.pivotPoint >= conn.startBlockNumber+fastSyncFetchBlocksPerCycle {
453			conn.blocksPerCycle = fastSyncFetchBlocksPerCycle
454		} else {
455			conn.blocksPerCycle = fetchBlocksPerCycle
456		}
457
458	fetch_blocks:
459		for i := 0; i < conn.blocksPerCycle; i++ {
460			if conn.startBlockNumber > conn.height {
461				// just in case block height has changed
462				log.Infof("height changed from: %d to: %d", conn.height, conn.startBlockNumber)
463				conn.nextState(cStateHighestBlock)
464				continueLooping = true
465				break fetch_blocks
466			}
467
468			if conn.startBlockNumber%100 == 0 {
469				log.Warnf("fetch block number: %d", conn.startBlockNumber)
470			} else {
471				log.Infof("fetch block number: %d", conn.startBlockNumber)
472			}
473			if packedNextBlock == nil {
474				p, err := conn.theClient.GetBlockData(conn.startBlockNumber)
475				if nil != err {
476					log.Errorf("fetch block number: %d  error: %s", conn.startBlockNumber, err)
477					conn.nextState(cStateHighestBlock) // retry
478					break fetch_blocks
479				}
480				packedBlock = p
481			} else {
482				packedBlock = packedNextBlock
483			}
484
485			if conn.fastSyncEnabled {
486				// test a random block for forgery
487				if i > 0 && i%fastSyncSkipPerBlocks == 0 {
488					h := conn.startBlockNumber - uint64(rand.Intn(fastSyncSkipPerBlocks))
489					log.Debugf("select random block: %d to test for forgery", h)
490					digest, err := blockheader.DigestForBlock(h)
491					if nil != err {
492						log.Infof("block number: %d  local digest error: %s", h, err)
493						conn.nextState(cStateHighestBlock) // retry
494						break fetch_blocks
495					}
496					d, err := conn.theClient.RemoteDigestOfHeight(h)
497					if nil != err {
498						log.Infof("block number: %d  fetch digest error: %s", h, err)
499						conn.nextState(cStateHighestBlock) // retry
500						break fetch_blocks
501					}
502
503					if d != digest {
504						log.Warnf("potetial block forgery: %d", h)
505
506						// remove old blocks
507						startingPoint := conn.startBlockNumber - uint64(i)
508						err := block.DeleteDownToBlock(startingPoint)
509						if nil != err {
510							log.Errorf("delete down to block number: %d  error: %s", startingPoint, err)
511						}
512
513						conn.fastSyncEnabled = false
514						conn.nextState(cStateHighestBlock)
515						conn.startBlockNumber = startingPoint
516						break fetch_blocks
517					}
518				}
519
520				// get next block:
521				//   packedNextBlock will be nil when local height is same as remote
522				var err error
523				packedNextBlock, err = conn.theClient.GetBlockData(conn.startBlockNumber + 1)
524				if nil != err {
525					log.Debugf("fetch next block number: %d  error: %s", conn.startBlockNumber+1, err)
526				}
527			} else {
528				packedNextBlock = nil
529			}
530
531			log.Debugf("store block number: %d", conn.startBlockNumber)
532			err := block.StoreIncoming(packedBlock, packedNextBlock, block.NoRescanVerified)
533			if nil != err {
534				log.Errorf(
535					"store block number: %d  error: %s",
536					conn.startBlockNumber,
537					err,
538				)
539				conn.nextState(cStateHighestBlock) // retry
540				break fetch_blocks
541			}
542
543			// next block
544			conn.startBlockNumber++
545		}
546
547	case cStateRebuild:
548		// return to normal operations
549		conn.nextState(cStateSampling)
550		conn.samples = 0 // zero out the counter
551		mode.Set(mode.Normal)
552		continueLooping = false
553
554	case cStateSampling:
555		// check peers
556		globalData.clientCount = conn.getConnectedClientCount()
557		if !isConnectionEnough(globalData.clientCount) {
558			log.Warnf("connections: %d below minimum client count: %d", globalData.clientCount, minimumClients)
559			continueLooping = true
560			conn.nextState(cStateConnecting)
561			return continueLooping
562		}
563
564		log.Infof("connections: %d", globalData.clientCount)
565
566		continueLooping = false
567
568		// check height
569		if conn.updateHeightAndClient() {
570			height := blockheader.Height()
571
572			log.Infof("height remote: %d, local: %d", conn.height, height)
573
574			if conn.hasBetterChain(height) {
575				log.Warn("check height: better chain")
576				conn.nextState(cStateForkDetect)
577				continueLooping = true
578			} else {
579				conn.samples = 0
580			}
581		} else {
582			conn.samples++
583			if conn.samples > samplingLimit {
584				log.Warn("check height: time to resync")
585				conn.nextState(cStateForkDetect)
586				continueLooping = true
587			}
588		}
589
590	}
591	return continueLooping
592}
593
594func isConnectionEnough(count int) bool {
595	return minimumClients <= count
596}
597
598func (conn *connector) isSameChain() bool {
599	if conn.theClient == nil {
600		conn.log.Debug("remote client empty")
601		return false
602	}
603
604	localDigest, err := blockheader.DigestForBlock(blockheader.Height())
605	if nil != err {
606		return false
607	}
608
609	if conn.height == blockheader.Height() && conn.theClient.CachedRemoteDigestOfLocalHeight() == localDigest {
610		return true
611	}
612
613	return false
614}
615
616func (conn *connector) hasBetterChain(localHeight uint64) bool {
617	if conn.theClient == nil {
618		conn.log.Debug("remote client empty")
619		return false
620	}
621
622	if conn.height < localHeight {
623		conn.log.Debugf("remote height %d is shorter than local height %d", conn.height, localHeight)
624		return false
625	}
626
627	if conn.height == localHeight && !conn.hasSmallerDigestThanLocal(localHeight) {
628		return false
629	}
630
631	return true
632}
633
634// different chain but with same height, possible fork exist
635// choose the chain that has smaller digest
636func (conn *connector) hasSmallerDigestThanLocal(localHeight uint64) bool {
637	remoteDigest := conn.theClient.CachedRemoteDigestOfLocalHeight()
638
639	// if upstream update during processing
640	if conn.theClient.LocalHeight() != localHeight {
641		conn.log.Warnf("remote height %d is different than local height %d", conn.theClient.LocalHeight(), localHeight)
642		return false
643	}
644
645	localDigest, err := blockheader.DigestForBlock(localHeight)
646	if nil != err {
647		conn.log.Warnf("local height: %d  digest error: %s", localHeight, err)
648		return false
649	}
650
651	return remoteDigest.SmallerDigestThan(localDigest)
652}
653
654func (conn *connector) updateHeightAndClient() bool {
655	conn.votes.Reset()
656	conn.votes.SetMinHeight(blockheader.Height())
657	conn.startElection()
658	elected, height := conn.elected()
659	if 0 == height {
660		conn.height = 0
661		return false
662	}
663
664	winnerName := elected.Name()
665	remoteAddr, err := elected.RemoteAddr()
666	if nil != err {
667		conn.log.Warnf("%s socket not connected", winnerName)
668		conn.height = 0
669		return false
670	}
671
672	conn.log.Debugf("winner %s majority height %d, connect to %s",
673		winnerName,
674		height,
675		remoteAddr,
676	)
677
678	if height > 0 && nil != elected {
679		globalData.blockHeight = height
680	}
681	conn.theClient = elected
682	conn.height = height
683	return true
684}
685
686func (conn *connector) startElection() {
687	conn.allClients(func(client upstream.Upstream, e *list.Element) {
688		if client.IsConnected() && client.ActiveInThePast(activeTime) {
689			conn.votes.VoteBy(client)
690		}
691	})
692}
693
694func (conn *connector) elected() (upstream.Upstream, uint64) {
695	elected, height, err := conn.votes.ElectedCandidate()
696	if nil != err {
697		conn.log.Warnf("get elected with error: %s", err)
698		return nil, 0
699	}
700
701	remoteAddr, err := elected.RemoteAddr()
702	if nil != err {
703		conn.log.Errorf("get client string with error: %s", err)
704		return nil, 0
705	}
706
707	digest := elected.CachedRemoteDigestOfLocalHeight()
708	conn.log.Infof(
709		"digest: %s elected with %d votes, remote addr: %s, height: %d",
710		digest,
711		conn.votes.NumVoteOfDigest(digest),
712		remoteAddr,
713		height,
714	)
715
716	return elected, height
717}
718
719func (conn *connector) connectUpstream(
720	priority string,
721	serverPublicKey []byte,
722	addresses []byte,
723) error {
724
725	log := conn.log
726
727	log.Debugf("connect: %s to: %x @ %x", priority, serverPublicKey, addresses)
728
729	// extract the first valid address
730	connV4, connV6 := util.PackedConnection(addresses).Unpack46()
731
732	// need to know if this node has IPv6
733	address := connV4
734	if nil != connV6 && conn.preferIPv6 {
735		address = connV6
736	}
737
738	if nil == address {
739		log.Errorf(
740			"reconnect: %x  error: no suitable address found ipv6 allowed: %t",
741			serverPublicKey,
742			conn.preferIPv6,
743		)
744		return fault.AddressIsNil
745	}
746
747	log.Infof("connect: %s to: %x @ %s", priority, serverPublicKey, address)
748
749	// see if already connected to this node
750	alreadyConnected := false
751	conn.searchClients(func(client upstream.Upstream, e *list.Element) bool {
752		if client.IsConnectedTo(serverPublicKey) {
753			if nil == e {
754				log.Debugf(
755					"already have static connection to: %x @ %s",
756					serverPublicKey,
757					*address,
758				)
759			} else {
760				log.Debugf("ignore change to: %x @ %s", serverPublicKey, *address)
761				conn.dynamicClients.MoveToBack(e)
762			}
763			alreadyConnected = true
764			return true
765		}
766		return false
767	})
768
769	if alreadyConnected {
770		return nil
771	}
772
773	// reconnect the oldest entry to new node
774	log.Infof("reconnect: %x @ %s", serverPublicKey, *address)
775	client := conn.dynamicClients.Front().Value.(upstream.Upstream)
776	err := client.Connect(address, serverPublicKey)
777	if nil != err {
778		log.Errorf("ConnectTo: %x @ %s  error: %s", serverPublicKey, *address, err)
779	} else {
780		conn.dynamicClients.MoveToBack(conn.dynamicClients.Front())
781	}
782
783	return err
784}
785
786func (conn *connector) releaseServerKey(serverPublicKey []byte) error {
787	log := conn.log
788	conn.searchClients(func(client upstream.Upstream, e *list.Element) bool {
789		if bytes.Equal(serverPublicKey, client.ServerPublicKey()) {
790			if e == nil { // static Clients
791				log.Infof("refuse to delete static peer: %x", serverPublicKey)
792			} else { // dynamic Clients
793				client.ResetServer()
794				log.Infof("peer: %x is released in upstream", serverPublicKey)
795				return true
796			}
797		}
798		return false
799	})
800	return nil
801}
802
803func (conn *connector) nextState(newState connectorState) {
804	conn.state = newState
805}
806
807func (conn *connector) getConnectedClientCount() int {
808	clientCount := 0
809	conn.allClients(func(client upstream.Upstream, e *list.Element) {
810		if client.IsConnected() {
811			clientCount++
812		}
813	})
814	return clientCount
815}
816