1// SPDX-License-Identifier: ISC 2// Copyright (c) 2014-2020 Bitmark Inc. 3// Use of this source code is governed by an ISC 4// license that can be found in the LICENSE file. 5 6package peer 7 8import ( 9 "bytes" 10 "container/list" 11 "fmt" 12 "math/rand" 13 "strings" 14 "sync" 15 "time" 16 17 "github.com/bitmark-inc/bitmarkd/block" 18 "github.com/bitmark-inc/bitmarkd/blockheader" 19 "github.com/bitmark-inc/bitmarkd/fault" 20 "github.com/bitmark-inc/bitmarkd/genesis" 21 "github.com/bitmark-inc/bitmarkd/messagebus" 22 "github.com/bitmark-inc/bitmarkd/mode" 23 "github.com/bitmark-inc/bitmarkd/peer/upstream" 24 "github.com/bitmark-inc/bitmarkd/peer/voting" 25 "github.com/bitmark-inc/bitmarkd/util" 26 "github.com/bitmark-inc/bitmarkd/zmqutil" 27 "github.com/bitmark-inc/logger" 28) 29 30// various timeouts 31const ( 32 // pause to limit bandwidth 33 cycleInterval = 15 * time.Second 34 35 // time out for connections 36 connectorTimeout = 60 * time.Second 37 38 // number of cycles to be 1 block out of sync before resync 39 samplingLimit = 6 40 41 // number of blocks to fetch in one set 42 fetchBlocksPerCycle = 200 43 44 // fail to fork if height difference is greater than this 45 forkProtection = 60 46 47 // do not proceed unless this many clients are connected 48 minimumClients = 5 49 50 // total number of dynamic clients 51 maximumDynamicClients = 25 52 53 // client should exist at least 1 response with in this number 54 activeTime = 60 * time.Second 55 56 // fast sync option to fetch block 57 fastSyncFetchBlocksPerCycle = 2000 58 fastSyncSkipPerBlocks = 100 59 fastSyncPivotBlocks = 1000 60) 61 62type connector struct { 63 sync.RWMutex 64 65 log *logger.L 66 preferIPv6 bool 67 68 staticClients []upstream.Upstream 69 70 dynamicClients list.List 71 72 state connectorState 73 74 theClient upstream.Upstream // client used for fetching blocks 75 startBlockNumber uint64 // block number where local chain forks 76 height uint64 // block number on best node 77 samples int // counter to detect missed block broadcast 78 votes voting.Voting 79 80 fastSyncEnabled bool // fast sync mode enabled? 81 blocksPerCycle int // number of blocks to fetch per cycle 82 pivotPoint uint64 // block number to stop fast syncing 83} 84 85// initialise the connector 86func (conn *connector) initialise( 87 privateKey []byte, 88 publicKey []byte, 89 connect []Connection, 90 dynamicEnabled bool, 91 preferIPv6 bool, 92 fastSync bool, 93) error { 94 95 log := logger.New("connector") 96 conn.log = log 97 98 conn.preferIPv6 = preferIPv6 99 100 conn.fastSyncEnabled = fastSync 101 102 log.Info("initialising…") 103 104 // allocate all sockets 105 staticCount := len(connect) // can be zero 106 if 0 == staticCount && !dynamicEnabled { 107 log.Error("zero static connections and dynamic is disabled") 108 return fault.NoConnectionsAvailable 109 } 110 conn.staticClients = make([]upstream.Upstream, staticCount) 111 112 // initially connect all static sockets 113 wg := sync.WaitGroup{} 114 errCh := make(chan error, len(connect)) 115 116 conn.log.Debugf("static connection count: %d", len(connect)) 117 118 for i, c := range connect { 119 wg.Add(1) 120 121 // start new goroutine for each connection 122 go func(conn *connector, c Connection, i int, wg *sync.WaitGroup, ch chan error) { 123 124 // error function call 125 errF := func(wg *sync.WaitGroup, ch chan error, e error) { 126 ch <- e 127 wg.Done() 128 } 129 130 // for canonicaling the error 131 canonicalErrF := func(c Connection, e error) error { 132 return fmt.Errorf("client: %q error: %s", c.Address, e) 133 } 134 135 address, err := util.NewConnection(c.Address) 136 if nil != err { 137 log.Errorf("client[%d]=address: %q error: %s", i, c.Address, err) 138 errF(wg, ch, canonicalErrF(c, err)) 139 return 140 } 141 serverPublicKey, err := zmqutil.ReadPublicKey(c.PublicKey) 142 if nil != err { 143 log.Errorf("client[%d]=public: %q error: %s", i, c.PublicKey, err) 144 errF(wg, ch, canonicalErrF(c, err)) 145 return 146 } 147 148 // prevent connection to self 149 if bytes.Equal(publicKey, serverPublicKey) { 150 err := fault.ConnectingToSelfForbidden 151 log.Errorf("client[%d]=public: %q error: %s", i, c.PublicKey, err) 152 errF(wg, ch, canonicalErrF(c, err)) 153 return 154 } 155 156 client, err := upstream.New(privateKey, publicKey, connectorTimeout) 157 if nil != err { 158 log.Errorf("client[%d]=%q error: %s", i, address, err) 159 errF(wg, ch, canonicalErrF(c, err)) 160 return 161 } 162 163 conn.Lock() 164 conn.staticClients[i] = client 165 globalData.connectorClients = append(globalData.connectorClients, client) 166 conn.Unlock() 167 168 err = client.Connect(address, serverPublicKey) 169 if nil != err { 170 log.Errorf("connect[%d]=%q error: %s", i, address, err) 171 errF(wg, ch, canonicalErrF(c, err)) 172 return 173 } 174 log.Infof("public key: %x at: %q", serverPublicKey, c.Address) 175 wg.Done() 176 177 }(conn, c, i, &wg, errCh) 178 } 179 180 conn.log.Debug("waiting for all static connections...") 181 wg.Wait() 182 183 // drop error channel for getting all errors 184 errs := make([]error, 0) 185 for len(errCh) > 0 { 186 errs = append(errs, <-errCh) 187 } 188 189 // error code for goto fail 190 err := error(nil) 191 192 if len(errs) == 1 { 193 err = errs[0] 194 goto fail 195 } else if len(errs) > 1 { 196 err = compositeError(errs) 197 goto fail 198 } 199 200 // just create sockets for dynamic clients 201 for i := 0; i < maximumDynamicClients; i++ { 202 client, e := upstream.New(privateKey, publicKey, connectorTimeout) 203 if nil != err { 204 log.Errorf("client[%d] error: %s", i, e) 205 err = e 206 goto fail 207 } 208 209 // create list of all dynamic clients 210 conn.dynamicClients.PushBack(client) 211 212 globalData.connectorClients = append(globalData.connectorClients, client) 213 } 214 215 conn.votes = voting.NewVoting() 216 217 // start state machine 218 conn.nextState(cStateConnecting) 219 220 return nil 221 222 // error handling 223fail: 224 conn.destroy() 225 226 return err 227} 228 229// combine multi error into one 230func compositeError(errors []error) error { 231 if nil == errors || 0 == len(errors) { 232 return nil 233 } 234 var ce strings.Builder 235 ce.WriteString("composite error: [") 236 len := len(errors) 237 for i, e := range errors { 238 ce.WriteString(e.Error()) 239 if i < len-1 { 240 ce.WriteString(", ") 241 } 242 } 243 ce.WriteString("]") 244 return fmt.Errorf(ce.String()) 245} 246 247func (conn *connector) allClients( 248 f func(client upstream.Upstream, e *list.Element), 249) { 250 for _, client := range conn.staticClients { 251 if client != nil { 252 f(client, nil) 253 } 254 } 255 for e := conn.dynamicClients.Front(); nil != e; e = e.Next() { 256 if client := e.Value.(upstream.Upstream); client != nil { 257 f(client, e) 258 } 259 } 260} 261 262func (conn *connector) searchClients( 263 f func(client upstream.Upstream, e *list.Element) bool, 264) { 265 for _, client := range conn.staticClients { 266 if f(client, nil) { 267 return 268 } 269 } 270 for e := conn.dynamicClients.Front(); nil != e; e = e.Next() { 271 if f(e.Value.(upstream.Upstream), e) { 272 return 273 } 274 } 275} 276 277func (conn *connector) destroy() { 278 conn.allClients(func(client upstream.Upstream, e *list.Element) { 279 client.Destroy() 280 }) 281} 282 283// various RPC calls to upstream connections 284func (conn *connector) Run(args interface{}, shutdown <-chan struct{}) { 285 log := conn.log 286 287 log.Info("starting…") 288 289 queue := messagebus.Bus.Connector.Chan() 290 291 timer := time.After(cycleInterval) 292 293loop: 294 for { 295 // wait for shutdown 296 log.Debug("waiting…") 297 298 select { 299 case <-shutdown: 300 break loop 301 case <-timer: // timer has priority over queue 302 timer = time.After(cycleInterval) 303 conn.process() 304 case item := <-queue: 305 c, _ := util.PackedConnection(item.Parameters[1]).Unpack() 306 conn.log.Debugf( 307 "received control: %s public key: %x connect: %x %q", 308 item.Command, 309 item.Parameters[0], 310 item.Parameters[1], 311 c, 312 ) 313 314 switch item.Command { 315 case "@D": // internal command: delete a peer 316 conn.releaseServerKey(item.Parameters[0]) 317 conn.log.Infof( 318 "connector receive server public key: %x", 319 item.Parameters[0], 320 ) 321 default: 322 err := conn.connectUpstream( 323 item.Command, 324 item.Parameters[0], 325 item.Parameters[1], 326 ) 327 if nil != err { 328 conn.log.Warnf("connect upstream error: %s", err) 329 } 330 } 331 } 332 } 333 log.Info("shutting down…") 334 conn.destroy() 335 log.Info("stopped") 336} 337 338// process the connect and return response 339func (conn *connector) process() { 340 // run the machine until it pauses 341 for conn.runStateMachine() { 342 } 343} 344 345// run state machine 346// return: 347// true if want more cycles 348// false to pase for I/O 349func (conn *connector) runStateMachine() bool { 350 log := conn.log 351 352 log.Infof("current state: %s", conn.state) 353 354 continueLooping := true 355 356 switch conn.state { 357 case cStateConnecting: 358 mode.Set(mode.Resynchronise) 359 globalData.clientCount = conn.getConnectedClientCount() 360 log.Infof("connections: %d", globalData.clientCount) 361 362 if isConnectionEnough(globalData.clientCount) { 363 conn.nextState(cStateHighestBlock) 364 } else { 365 log.Warnf("connections: %d below minimum client count: %d", globalData.clientCount, minimumClients) 366 messagebus.Bus.Announce.Send("reconnect") 367 } 368 continueLooping = false 369 370 case cStateHighestBlock: 371 if conn.updateHeightAndClient() { 372 log.Infof("highest block number: %d client: %s", conn.height, conn.theClient.Name()) 373 if conn.hasBetterChain(blockheader.Height()) { 374 log.Infof("new chain from %s, height %d, digest %s", conn.theClient.Name(), conn.height, conn.theClient.CachedRemoteDigestOfLocalHeight().String()) 375 log.Info("enter fork detect state") 376 conn.nextState(cStateForkDetect) 377 } else if conn.isSameChain() { 378 log.Info("remote same chain") 379 conn.nextState(cStateRebuild) 380 } else { 381 log.Info("remote chain invalid, stop looping for now") 382 continueLooping = false 383 } 384 } else { 385 log.Warn("highest block: connection lost") 386 conn.nextState(cStateConnecting) 387 continueLooping = false 388 } 389 390 case cStateForkDetect: 391 height := blockheader.Height() 392 if !conn.hasBetterChain(height) { 393 log.Info("remote without better chain, enter state rebuild") 394 conn.nextState(cStateRebuild) 395 } else { 396 // determine pivot point to stop fast sync 397 if conn.height > fastSyncPivotBlocks { 398 conn.pivotPoint = conn.height - fastSyncPivotBlocks 399 } else { 400 conn.pivotPoint = 0 401 } 402 403 log.Infof("Pivot point for fast sync: %d", conn.pivotPoint) 404 405 // first block number 406 conn.startBlockNumber = genesis.BlockNumber + 1 407 conn.nextState(cStateFetchBlocks) // assume success 408 log.Infof("local block number: %d", height) 409 410 blockheader.ClearCache() 411 // check digests of descending blocks (to detect a fork) 412 check_digests: 413 for h := height; h >= genesis.BlockNumber; h -= 1 { 414 digest, err := blockheader.DigestForBlock(h) 415 if nil != err { 416 log.Infof("block number: %d local digest error: %s", h, err) 417 conn.nextState(cStateHighestBlock) // retry 418 break check_digests 419 } 420 d, err := conn.theClient.RemoteDigestOfHeight(h) 421 if nil != err { 422 log.Infof("block number: %d fetch digest error: %s", h, err) 423 conn.nextState(cStateHighestBlock) // retry 424 break check_digests 425 } else if d == digest { 426 if height-h >= forkProtection { 427 log.Errorf("fork protection at: %d - %d >= %d", height, h, forkProtection) 428 conn.nextState(cStateHighestBlock) 429 break check_digests 430 } 431 432 conn.startBlockNumber = h + 1 433 log.Infof("fork from block number: %d", conn.startBlockNumber) 434 435 // remove old blocks 436 err := block.DeleteDownToBlock(conn.startBlockNumber) 437 if nil != err { 438 log.Errorf("delete down to block number: %d error: %s", conn.startBlockNumber, err) 439 conn.nextState(cStateHighestBlock) // retry 440 } 441 break check_digests 442 } 443 } 444 } 445 446 case cStateFetchBlocks: 447 continueLooping = false 448 var packedBlock []byte 449 var packedNextBlock []byte 450 451 // Check fast sync state on each loop 452 if conn.fastSyncEnabled && conn.pivotPoint >= conn.startBlockNumber+fastSyncFetchBlocksPerCycle { 453 conn.blocksPerCycle = fastSyncFetchBlocksPerCycle 454 } else { 455 conn.blocksPerCycle = fetchBlocksPerCycle 456 } 457 458 fetch_blocks: 459 for i := 0; i < conn.blocksPerCycle; i++ { 460 if conn.startBlockNumber > conn.height { 461 // just in case block height has changed 462 log.Infof("height changed from: %d to: %d", conn.height, conn.startBlockNumber) 463 conn.nextState(cStateHighestBlock) 464 continueLooping = true 465 break fetch_blocks 466 } 467 468 if conn.startBlockNumber%100 == 0 { 469 log.Warnf("fetch block number: %d", conn.startBlockNumber) 470 } else { 471 log.Infof("fetch block number: %d", conn.startBlockNumber) 472 } 473 if packedNextBlock == nil { 474 p, err := conn.theClient.GetBlockData(conn.startBlockNumber) 475 if nil != err { 476 log.Errorf("fetch block number: %d error: %s", conn.startBlockNumber, err) 477 conn.nextState(cStateHighestBlock) // retry 478 break fetch_blocks 479 } 480 packedBlock = p 481 } else { 482 packedBlock = packedNextBlock 483 } 484 485 if conn.fastSyncEnabled { 486 // test a random block for forgery 487 if i > 0 && i%fastSyncSkipPerBlocks == 0 { 488 h := conn.startBlockNumber - uint64(rand.Intn(fastSyncSkipPerBlocks)) 489 log.Debugf("select random block: %d to test for forgery", h) 490 digest, err := blockheader.DigestForBlock(h) 491 if nil != err { 492 log.Infof("block number: %d local digest error: %s", h, err) 493 conn.nextState(cStateHighestBlock) // retry 494 break fetch_blocks 495 } 496 d, err := conn.theClient.RemoteDigestOfHeight(h) 497 if nil != err { 498 log.Infof("block number: %d fetch digest error: %s", h, err) 499 conn.nextState(cStateHighestBlock) // retry 500 break fetch_blocks 501 } 502 503 if d != digest { 504 log.Warnf("potetial block forgery: %d", h) 505 506 // remove old blocks 507 startingPoint := conn.startBlockNumber - uint64(i) 508 err := block.DeleteDownToBlock(startingPoint) 509 if nil != err { 510 log.Errorf("delete down to block number: %d error: %s", startingPoint, err) 511 } 512 513 conn.fastSyncEnabled = false 514 conn.nextState(cStateHighestBlock) 515 conn.startBlockNumber = startingPoint 516 break fetch_blocks 517 } 518 } 519 520 // get next block: 521 // packedNextBlock will be nil when local height is same as remote 522 var err error 523 packedNextBlock, err = conn.theClient.GetBlockData(conn.startBlockNumber + 1) 524 if nil != err { 525 log.Debugf("fetch next block number: %d error: %s", conn.startBlockNumber+1, err) 526 } 527 } else { 528 packedNextBlock = nil 529 } 530 531 log.Debugf("store block number: %d", conn.startBlockNumber) 532 err := block.StoreIncoming(packedBlock, packedNextBlock, block.NoRescanVerified) 533 if nil != err { 534 log.Errorf( 535 "store block number: %d error: %s", 536 conn.startBlockNumber, 537 err, 538 ) 539 conn.nextState(cStateHighestBlock) // retry 540 break fetch_blocks 541 } 542 543 // next block 544 conn.startBlockNumber++ 545 } 546 547 case cStateRebuild: 548 // return to normal operations 549 conn.nextState(cStateSampling) 550 conn.samples = 0 // zero out the counter 551 mode.Set(mode.Normal) 552 continueLooping = false 553 554 case cStateSampling: 555 // check peers 556 globalData.clientCount = conn.getConnectedClientCount() 557 if !isConnectionEnough(globalData.clientCount) { 558 log.Warnf("connections: %d below minimum client count: %d", globalData.clientCount, minimumClients) 559 continueLooping = true 560 conn.nextState(cStateConnecting) 561 return continueLooping 562 } 563 564 log.Infof("connections: %d", globalData.clientCount) 565 566 continueLooping = false 567 568 // check height 569 if conn.updateHeightAndClient() { 570 height := blockheader.Height() 571 572 log.Infof("height remote: %d, local: %d", conn.height, height) 573 574 if conn.hasBetterChain(height) { 575 log.Warn("check height: better chain") 576 conn.nextState(cStateForkDetect) 577 continueLooping = true 578 } else { 579 conn.samples = 0 580 } 581 } else { 582 conn.samples++ 583 if conn.samples > samplingLimit { 584 log.Warn("check height: time to resync") 585 conn.nextState(cStateForkDetect) 586 continueLooping = true 587 } 588 } 589 590 } 591 return continueLooping 592} 593 594func isConnectionEnough(count int) bool { 595 return minimumClients <= count 596} 597 598func (conn *connector) isSameChain() bool { 599 if conn.theClient == nil { 600 conn.log.Debug("remote client empty") 601 return false 602 } 603 604 localDigest, err := blockheader.DigestForBlock(blockheader.Height()) 605 if nil != err { 606 return false 607 } 608 609 if conn.height == blockheader.Height() && conn.theClient.CachedRemoteDigestOfLocalHeight() == localDigest { 610 return true 611 } 612 613 return false 614} 615 616func (conn *connector) hasBetterChain(localHeight uint64) bool { 617 if conn.theClient == nil { 618 conn.log.Debug("remote client empty") 619 return false 620 } 621 622 if conn.height < localHeight { 623 conn.log.Debugf("remote height %d is shorter than local height %d", conn.height, localHeight) 624 return false 625 } 626 627 if conn.height == localHeight && !conn.hasSmallerDigestThanLocal(localHeight) { 628 return false 629 } 630 631 return true 632} 633 634// different chain but with same height, possible fork exist 635// choose the chain that has smaller digest 636func (conn *connector) hasSmallerDigestThanLocal(localHeight uint64) bool { 637 remoteDigest := conn.theClient.CachedRemoteDigestOfLocalHeight() 638 639 // if upstream update during processing 640 if conn.theClient.LocalHeight() != localHeight { 641 conn.log.Warnf("remote height %d is different than local height %d", conn.theClient.LocalHeight(), localHeight) 642 return false 643 } 644 645 localDigest, err := blockheader.DigestForBlock(localHeight) 646 if nil != err { 647 conn.log.Warnf("local height: %d digest error: %s", localHeight, err) 648 return false 649 } 650 651 return remoteDigest.SmallerDigestThan(localDigest) 652} 653 654func (conn *connector) updateHeightAndClient() bool { 655 conn.votes.Reset() 656 conn.votes.SetMinHeight(blockheader.Height()) 657 conn.startElection() 658 elected, height := conn.elected() 659 if 0 == height { 660 conn.height = 0 661 return false 662 } 663 664 winnerName := elected.Name() 665 remoteAddr, err := elected.RemoteAddr() 666 if nil != err { 667 conn.log.Warnf("%s socket not connected", winnerName) 668 conn.height = 0 669 return false 670 } 671 672 conn.log.Debugf("winner %s majority height %d, connect to %s", 673 winnerName, 674 height, 675 remoteAddr, 676 ) 677 678 if height > 0 && nil != elected { 679 globalData.blockHeight = height 680 } 681 conn.theClient = elected 682 conn.height = height 683 return true 684} 685 686func (conn *connector) startElection() { 687 conn.allClients(func(client upstream.Upstream, e *list.Element) { 688 if client.IsConnected() && client.ActiveInThePast(activeTime) { 689 conn.votes.VoteBy(client) 690 } 691 }) 692} 693 694func (conn *connector) elected() (upstream.Upstream, uint64) { 695 elected, height, err := conn.votes.ElectedCandidate() 696 if nil != err { 697 conn.log.Warnf("get elected with error: %s", err) 698 return nil, 0 699 } 700 701 remoteAddr, err := elected.RemoteAddr() 702 if nil != err { 703 conn.log.Errorf("get client string with error: %s", err) 704 return nil, 0 705 } 706 707 digest := elected.CachedRemoteDigestOfLocalHeight() 708 conn.log.Infof( 709 "digest: %s elected with %d votes, remote addr: %s, height: %d", 710 digest, 711 conn.votes.NumVoteOfDigest(digest), 712 remoteAddr, 713 height, 714 ) 715 716 return elected, height 717} 718 719func (conn *connector) connectUpstream( 720 priority string, 721 serverPublicKey []byte, 722 addresses []byte, 723) error { 724 725 log := conn.log 726 727 log.Debugf("connect: %s to: %x @ %x", priority, serverPublicKey, addresses) 728 729 // extract the first valid address 730 connV4, connV6 := util.PackedConnection(addresses).Unpack46() 731 732 // need to know if this node has IPv6 733 address := connV4 734 if nil != connV6 && conn.preferIPv6 { 735 address = connV6 736 } 737 738 if nil == address { 739 log.Errorf( 740 "reconnect: %x error: no suitable address found ipv6 allowed: %t", 741 serverPublicKey, 742 conn.preferIPv6, 743 ) 744 return fault.AddressIsNil 745 } 746 747 log.Infof("connect: %s to: %x @ %s", priority, serverPublicKey, address) 748 749 // see if already connected to this node 750 alreadyConnected := false 751 conn.searchClients(func(client upstream.Upstream, e *list.Element) bool { 752 if client.IsConnectedTo(serverPublicKey) { 753 if nil == e { 754 log.Debugf( 755 "already have static connection to: %x @ %s", 756 serverPublicKey, 757 *address, 758 ) 759 } else { 760 log.Debugf("ignore change to: %x @ %s", serverPublicKey, *address) 761 conn.dynamicClients.MoveToBack(e) 762 } 763 alreadyConnected = true 764 return true 765 } 766 return false 767 }) 768 769 if alreadyConnected { 770 return nil 771 } 772 773 // reconnect the oldest entry to new node 774 log.Infof("reconnect: %x @ %s", serverPublicKey, *address) 775 client := conn.dynamicClients.Front().Value.(upstream.Upstream) 776 err := client.Connect(address, serverPublicKey) 777 if nil != err { 778 log.Errorf("ConnectTo: %x @ %s error: %s", serverPublicKey, *address, err) 779 } else { 780 conn.dynamicClients.MoveToBack(conn.dynamicClients.Front()) 781 } 782 783 return err 784} 785 786func (conn *connector) releaseServerKey(serverPublicKey []byte) error { 787 log := conn.log 788 conn.searchClients(func(client upstream.Upstream, e *list.Element) bool { 789 if bytes.Equal(serverPublicKey, client.ServerPublicKey()) { 790 if e == nil { // static Clients 791 log.Infof("refuse to delete static peer: %x", serverPublicKey) 792 } else { // dynamic Clients 793 client.ResetServer() 794 log.Infof("peer: %x is released in upstream", serverPublicKey) 795 return true 796 } 797 } 798 return false 799 }) 800 return nil 801} 802 803func (conn *connector) nextState(newState connectorState) { 804 conn.state = newState 805} 806 807func (conn *connector) getConnectedClientCount() int { 808 clientCount := 0 809 conn.allClients(func(client upstream.Upstream, e *list.Element) { 810 if client.IsConnected() { 811 clientCount++ 812 } 813 }) 814 return clientCount 815} 816