1// Copyright 2013-2020 Aerospike, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package aerospike 16 17import ( 18 "errors" 19 "fmt" 20 "net" 21 "runtime/debug" 22 "strings" 23 "sync" 24 "sync/atomic" 25 "time" 26 27 . "github.com/aerospike/aerospike-client-go/logger" 28 "golang.org/x/sync/errgroup" 29 30 . "github.com/aerospike/aerospike-client-go/internal/atomic" 31 . "github.com/aerospike/aerospike-client-go/types" 32) 33 34// Cluster encapsulates the aerospike cluster nodes and manages 35// them. 36type Cluster struct { 37 // Initial host nodes specified by user. 38 seeds SyncVal //[]*Host 39 40 // All aliases for all nodes in cluster. 41 // Only accessed within cluster tend goroutine. 42 aliases SyncVal //map[Host]*Node 43 44 // Map of active nodes in cluster. 45 // Only accessed within cluster tend goroutine. 46 nodesMap SyncVal //map[string]*Node 47 48 // Active nodes in cluster. 49 nodes SyncVal //[]*Node 50 stats map[string]*nodeStats //host => stats 51 statsLock sync.Mutex 52 53 // Hints for best node for a partition 54 partitionWriteMap atomic.Value //partitionMap 55 56 clientPolicy ClientPolicy 57 infoPolicy InfoPolicy 58 connectionThreshold AtomicInt // number of parallel opening connections 59 60 nodeIndex uint64 // only used via atomic operations 61 replicaIndex uint64 // only used via atomic operations 62 63 wgTend sync.WaitGroup 64 tendChannel chan struct{} 65 closed AtomicBool 66 67 // Aerospike v3.6.0+ 68 supportsFloat, supportsBatchIndex, supportsReplicasAll, supportsGeo *AtomicBool 69 70 // User name in UTF-8 encoded bytes. 71 user string 72 73 // Password in hashed format in bytes. 74 password SyncVal // []byte 75} 76 77// NewCluster generates a Cluster instance. 78func NewCluster(policy *ClientPolicy, hosts []*Host) (*Cluster, error) { 79 // Validate the policy params 80 if policy.MinConnectionsPerNode > policy.ConnectionQueueSize { 81 panic("minimum number of connections specified in the ClientPolicy is bigger than total connection pool size") 82 } 83 84 // Default TLS names when TLS enabled. 85 newHosts := make([]*Host, 0, len(hosts)) 86 if policy.TlsConfig != nil && !policy.TlsConfig.InsecureSkipVerify { 87 useClusterName := len(policy.ClusterName) > 0 88 89 for _, host := range hosts { 90 nh := *host 91 if nh.TLSName == "" { 92 if useClusterName { 93 nh.TLSName = policy.ClusterName 94 } else { 95 nh.TLSName = host.Name 96 } 97 } 98 newHosts = append(newHosts, &nh) 99 } 100 hosts = newHosts 101 } 102 103 newCluster := &Cluster{ 104 clientPolicy: *policy, 105 infoPolicy: InfoPolicy{Timeout: policy.Timeout}, 106 tendChannel: make(chan struct{}), 107 108 seeds: *NewSyncVal(hosts), 109 aliases: *NewSyncVal(make(map[Host]*Node)), 110 nodesMap: *NewSyncVal(make(map[string]*Node)), 111 nodes: *NewSyncVal([]*Node{}), 112 stats: map[string]*nodeStats{}, 113 114 password: *NewSyncVal(nil), 115 116 supportsFloat: NewAtomicBool(false), 117 supportsBatchIndex: NewAtomicBool(false), 118 supportsReplicasAll: NewAtomicBool(false), 119 supportsGeo: NewAtomicBool(false), 120 } 121 122 newCluster.partitionWriteMap.Store(make(partitionMap)) 123 124 // setup auth info for cluster 125 if policy.RequiresAuthentication() { 126 if policy.AuthMode == AuthModeExternal && policy.TlsConfig == nil { 127 return nil, errors.New("External Authentication requires TLS configuration to be set, because it sends clear password on the wire.") 128 } 129 130 newCluster.user = policy.User 131 hashedPass, err := hashPassword(policy.Password) 132 if err != nil { 133 return nil, err 134 } 135 newCluster.password = *NewSyncVal(hashedPass) 136 } 137 138 // try to seed connections for first use 139 err := newCluster.waitTillStabilized() 140 141 // apply policy rules 142 if policy.FailIfNotConnected && !newCluster.IsConnected() { 143 if err != nil { 144 return nil, err 145 } 146 return nil, fmt.Errorf("Failed to connect to host(s): %v. The network connection(s) to cluster nodes may have timed out, or the cluster may be in a state of flux.", hosts) 147 } 148 149 // start up cluster maintenance go routine 150 newCluster.wgTend.Add(1) 151 go newCluster.clusterBoss(&newCluster.clientPolicy) 152 153 if err == nil { 154 Logger.Debug("New cluster initialized and ready to be used...") 155 } else { 156 Logger.Error("New cluster was not initialized successfully, but the client will keep trying to connect to the database. Error: %s", err.Error()) 157 } 158 159 return newCluster, err 160} 161 162// String implements the stringer interface 163func (clstr *Cluster) String() string { 164 return fmt.Sprintf("%v", clstr.GetNodes()) 165} 166 167// Maintains the cluster on intervals. 168// All clean up code for cluster is here as well. 169func (clstr *Cluster) clusterBoss(policy *ClientPolicy) { 170 Logger.Info("Starting the cluster tend goroutine...") 171 172 defer func() { 173 if r := recover(); r != nil { 174 Logger.Error("Cluster tend goroutine crashed: %s", debug.Stack()) 175 go clstr.clusterBoss(&clstr.clientPolicy) 176 } 177 }() 178 179 defer clstr.wgTend.Done() 180 181 tendInterval := policy.TendInterval 182 if tendInterval <= 10*time.Millisecond { 183 tendInterval = 10 * time.Millisecond 184 } 185 186Loop: 187 for { 188 select { 189 case <-clstr.tendChannel: 190 // tend channel closed 191 Logger.Debug("Tend channel closed. Shutting down the cluster...") 192 break Loop 193 case <-time.After(tendInterval): 194 tm := time.Now() 195 if err := clstr.tend(); err != nil { 196 Logger.Warn(err.Error()) 197 } 198 199 // Tending took longer than requested tend interval. 200 // Tending is too slow for the cluster, and may be falling behind scheule. 201 if tendDuration := time.Since(tm); tendDuration > clstr.clientPolicy.TendInterval { 202 Logger.Warn("Tending took %s, while your requested ClientPolicy.TendInterval is %s. Tends are slower than the interval, and may be falling behind the changes in the cluster.", tendDuration, clstr.clientPolicy.TendInterval) 203 } 204 } 205 } 206 207 // cleanup code goes here 208 // close the nodes 209 nodeArray := clstr.GetNodes() 210 for _, node := range nodeArray { 211 node.Close() 212 } 213} 214 215// AddSeeds adds new hosts to the cluster. 216// They will be added to the cluster on next tend call. 217func (clstr *Cluster) AddSeeds(hosts []*Host) { 218 clstr.seeds.Update(func(val interface{}) (interface{}, error) { 219 seeds := val.([]*Host) 220 seeds = append(seeds, hosts...) 221 return seeds, nil 222 }) 223} 224 225// Updates cluster state 226func (clstr *Cluster) tend() error { 227 228 nodes := clstr.GetNodes() 229 nodeCountBeforeTend := len(nodes) 230 231 // All node additions/deletions are performed in tend goroutine. 232 // If active nodes don't exist, seed cluster. 233 if len(nodes) == 0 { 234 Logger.Info("No connections available; seeding...") 235 if newNodesFound, err := clstr.seedNodes(); !newNodesFound { 236 return err 237 } 238 239 // refresh nodes list after seeding 240 nodes = clstr.GetNodes() 241 } 242 243 peers := newPeers(len(nodes)+16, 16) 244 245 floatSupport := true 246 batchIndexSupport := true 247 geoSupport := true 248 249 for _, node := range nodes { 250 // Clear node reference counts. 251 node.referenceCount.Set(0) 252 node.partitionChanged.Set(false) 253 if !node.supportsPeers.Get() { 254 peers.usePeers.Set(false) 255 } 256 } 257 258 wg := sync.WaitGroup{} 259 wg.Add(len(nodes)) 260 for _, node := range nodes { 261 go func(node *Node) { 262 defer wg.Done() 263 if err := node.Refresh(peers); err != nil { 264 Logger.Debug("Error occurred while refreshing node: %s", node.String()) 265 } 266 }(node) 267 } 268 wg.Wait() 269 270 // Refresh peers when necessary. 271 if peers.usePeers.Get() && (peers.genChanged.Get() || len(peers.peers()) != nodeCountBeforeTend) { 272 // Refresh peers for all nodes that responded the first time even if only one node's peers changed. 273 peers.refreshCount.Set(0) 274 275 wg.Add(len(nodes)) 276 for _, node := range nodes { 277 go func(node *Node) { 278 defer wg.Done() 279 node.refreshPeers(peers) 280 }(node) 281 } 282 wg.Wait() 283 } 284 285 var partitionMap partitionMap 286 287 // Use the following function to allocate memory for the partitionMap on demand. 288 // This will prevent the allocation when the cluster is stable, and make tend a bit faster. 289 pmlock := new(sync.Mutex) 290 setPartitionMap := func(l *sync.Mutex) { 291 l.Lock() 292 defer l.Unlock() 293 if partitionMap == nil { 294 partitionMap = clstr.getPartitions().clone() 295 } 296 } 297 298 // find the first host that connects 299 for _, _peer := range peers.peers() { 300 if clstr.peerExists(peers, _peer.nodeName) { 301 // Node already exists. Do not even try to connect to hosts. 302 continue 303 } 304 305 wg.Add(1) 306 go func(__peer *peer) { 307 defer wg.Done() 308 for _, host := range __peer.hosts { 309 // attempt connection to the host 310 nv := nodeValidator{} 311 if err := nv.validateNode(clstr, host); err != nil { 312 Logger.Warn("Add node `%s` failed: `%s`", host, err) 313 continue 314 } 315 316 // Must look for new node name in the unlikely event that node names do not agree. 317 if __peer.nodeName != nv.name { 318 Logger.Warn("Peer node `%s` is different than actual node `%s` for host `%s`", __peer.nodeName, nv.name, host) 319 } 320 321 if clstr.peerExists(peers, nv.name) { 322 // Node already exists. Do not even try to connect to hosts. 323 break 324 } 325 326 // Create new node. 327 node := clstr.createNode(&nv) 328 peers.addNode(nv.name, node) 329 setPartitionMap(pmlock) 330 node.refreshPartitions(peers, partitionMap) 331 break 332 } 333 }(_peer) 334 } 335 336 // Refresh partition map when necessary. 337 wg.Add(len(nodes)) 338 for _, node := range nodes { 339 go func(node *Node) { 340 defer wg.Done() 341 if node.partitionChanged.Get() { 342 setPartitionMap(pmlock) 343 node.refreshPartitions(peers, partitionMap) 344 } 345 }(node) 346 } 347 348 // This waits for the both steps above 349 wg.Wait() 350 351 if peers.genChanged.Get() || !peers.usePeers.Get() { 352 // Handle nodes changes determined from refreshes. 353 removeList := clstr.findNodesToRemove(peers.refreshCount.Get()) 354 355 // Remove nodes in a batch. 356 if len(removeList) > 0 { 357 for _, n := range removeList { 358 Logger.Debug("The following nodes will be removed: %s", n) 359 } 360 clstr.removeNodes(removeList) 361 } 362 363 clstr.aggregateNodestats(removeList) 364 } 365 366 // Add nodes in a batch. 367 if len(peers.nodes()) > 0 { 368 clstr.addNodes(peers.nodes()) 369 } 370 371 if !floatSupport { 372 Logger.Warn("Some cluster nodes do not support float type. Disabling native float support in the client library...") 373 } 374 375 // set the cluster supported features 376 clstr.supportsFloat.Set(floatSupport) 377 clstr.supportsBatchIndex.Set(batchIndexSupport) 378 clstr.supportsGeo.Set(geoSupport) 379 380 // update all partitions in one go 381 updatePartitionMap := false 382 for _, node := range clstr.GetNodes() { 383 if node.partitionChanged.Get() { 384 updatePartitionMap = true 385 break 386 } 387 } 388 389 if updatePartitionMap { 390 clstr.setPartitions(partitionMap) 391 } 392 393 if err := clstr.getPartitions().validate(); err != nil { 394 Logger.Debug("Error validating the cluster partition map after tend: %s", err.Error()) 395 } 396 397 // only log if node count is changed 398 if nodeCountBeforeTend != len(clstr.GetNodes()) { 399 Logger.Info("Tend finished. Live node count changes from %d to %d", nodeCountBeforeTend, len(clstr.GetNodes())) 400 } 401 402 clstr.aggregateNodestats(clstr.GetNodes()) 403 404 return nil 405} 406 407func (clstr *Cluster) aggregateNodestats(nodeList []*Node) { 408 // update stats 409 clstr.statsLock.Lock() 410 defer clstr.statsLock.Unlock() 411 412 for _, node := range nodeList { 413 h := node.host.String() 414 if stats, exists := clstr.stats[h]; exists { 415 stats.aggregate(node.stats.getAndReset()) 416 } else { 417 clstr.stats[h] = node.stats.getAndReset() 418 } 419 } 420} 421 422func (clstr *Cluster) statsCopy() map[string]nodeStats { 423 clstr.statsLock.Lock() 424 defer clstr.statsLock.Unlock() 425 426 res := make(map[string]nodeStats, len(clstr.stats)) 427 for _, node := range clstr.GetNodes() { 428 h := node.host.String() 429 if stats, exists := clstr.stats[h]; exists { 430 statsCopy := stats.clone() 431 statsCopy.ConnectionsOpen = int64(node.connectionCount.Get()) 432 res[h] = statsCopy 433 } 434 } 435 436 // stats for nodes which do not exist anymore 437 for h, stats := range clstr.stats { 438 if _, exists := res[h]; !exists { 439 stats.ConnectionsOpen = 0 440 res[h] = stats.clone() 441 } 442 } 443 444 return res 445} 446 447func (clstr *Cluster) peerExists(peers *peers, nodeName string) bool { 448 node := clstr.findNodeByName(nodeName) 449 if node != nil { 450 node.referenceCount.IncrementAndGet() 451 return true 452 } 453 454 node = peers.nodeByName(nodeName) 455 if node != nil { 456 node.referenceCount.IncrementAndGet() 457 return true 458 } 459 460 return false 461} 462 463// Tend the cluster until it has stabilized and return control. 464// This helps avoid initial database request timeout issues when 465// a large number of threads are initiated at client startup. 466// 467// If the cluster has not stabilized by the timeout, return 468// control as well. Do not return an error since future 469// database requests may still succeed. 470func (clstr *Cluster) waitTillStabilized() error { 471 count := -1 472 473 doneCh := make(chan error, 10) 474 475 // will run until the cluster is stabilized 476 go func() { 477 var err error 478 for { 479 if err = clstr.tend(); err != nil { 480 if aerr, ok := err.(AerospikeError); ok { 481 switch aerr.ResultCode() { 482 case NOT_AUTHENTICATED, CLUSTER_NAME_MISMATCH_ERROR: 483 doneCh <- err 484 return 485 } 486 } 487 Logger.Warn(err.Error()) 488 } 489 490 // // if there are no errors in connecting to the cluster, then validate the partition table 491 // if err == nil { 492 // err = clstr.getPartitions().validate() 493 // } 494 495 // Check to see if cluster has changed since the last Tend(). 496 // If not, assume cluster has stabilized and return. 497 if count == len(clstr.GetNodes()) { 498 break 499 } 500 501 time.Sleep(time.Millisecond) 502 503 count = len(clstr.GetNodes()) 504 } 505 doneCh <- err 506 }() 507 508 select { 509 case <-time.After(clstr.clientPolicy.Timeout): 510 if clstr.clientPolicy.FailIfNotConnected { 511 clstr.Close() 512 } 513 return errors.New("Connecting to the cluster timed out.") 514 case err := <-doneCh: 515 if err != nil && clstr.clientPolicy.FailIfNotConnected { 516 clstr.Close() 517 } 518 return err 519 } 520} 521 522func (clstr *Cluster) findAlias(alias *Host) *Node { 523 res, _ := clstr.aliases.GetSyncedVia(func(val interface{}) (interface{}, error) { 524 aliases := val.(map[Host]*Node) 525 return aliases[*alias], nil 526 }) 527 528 return res.(*Node) 529} 530 531func (clstr *Cluster) setPartitions(partMap partitionMap) { 532 if err := partMap.validate(); err != nil { 533 Logger.Error("Partition map error: %s.", err.Error()) 534 } 535 536 clstr.partitionWriteMap.Store(partMap) 537} 538 539func (clstr *Cluster) getPartitions() partitionMap { 540 return clstr.partitionWriteMap.Load().(partitionMap) 541} 542 543// discoverSeeds will lookup the seed hosts and convert seed hosts 544// to IP addresses. 545func discoverSeedIPs(seeds []*Host) (res []*Host) { 546 for _, seed := range seeds { 547 addresses, err := net.LookupHost(seed.Name) 548 if err != nil { 549 continue 550 } 551 552 for i := range addresses { 553 h := *seed 554 h.Name = addresses[i] 555 res = append(res, &h) 556 } 557 } 558 559 return res 560} 561 562// Adds seeds to the cluster 563func (clstr *Cluster) seedNodes() (bool, error) { 564 // Must copy array reference for copy on write semantics to work. 565 seedArrayIfc, _ := clstr.seeds.GetSyncedVia(func(val interface{}) (interface{}, error) { 566 seeds := val.([]*Host) 567 seeds_copy := make([]*Host, len(seeds)) 568 copy(seeds_copy, seeds) 569 570 return seeds_copy, nil 571 }) 572 573 // discover seed IPs from DNS or Load Balancers 574 seedArray := discoverSeedIPs(seedArrayIfc.([]*Host)) 575 576 successChan := make(chan struct{}, len(seedArray)) 577 errChan := make(chan error, len(seedArray)) 578 579 Logger.Info("Seeding the cluster. Seeds count: %d", len(seedArray)) 580 581 // Add all nodes at once to avoid copying entire array multiple times. 582 for i, seed := range seedArray { 583 go func(index int, seed *Host) { 584 nodesToAdd := make(nodesToAddT, 128) 585 nv := nodeValidator{} 586 err := nv.seedNodes(clstr, seed, nodesToAdd) 587 if err != nil { 588 Logger.Warn("Seed %s failed: %s", seed.String(), err.Error()) 589 errChan <- err 590 return 591 } 592 clstr.addNodes(nodesToAdd) 593 successChan <- struct{}{} 594 }(i, seed) 595 } 596 597 errorList := make([]error, 0, len(seedArray)) 598 seedCount := len(seedArray) 599L: 600 for { 601 select { 602 case err := <-errChan: 603 errorList = append(errorList, err) 604 seedCount-- 605 if seedCount <= 0 { 606 break L 607 } 608 case <-successChan: 609 // even one seed is enough 610 return true, nil 611 case <-time.After(clstr.clientPolicy.Timeout): 612 // time is up, no seeds found 613 break L 614 } 615 } 616 617 var errStrs []string 618 for _, err := range errorList { 619 if err != nil { 620 if aerr, ok := err.(AerospikeError); ok { 621 switch aerr.ResultCode() { 622 case NOT_AUTHENTICATED: 623 return false, NewAerospikeError(NOT_AUTHENTICATED) 624 case CLUSTER_NAME_MISMATCH_ERROR: 625 return false, aerr 626 } 627 } 628 errStrs = append(errStrs, err.Error()) 629 } 630 } 631 632 return false, NewAerospikeError(INVALID_NODE_ERROR, "Failed to connect to hosts:"+strings.Join(errStrs, "\n")) 633} 634 635func (clstr *Cluster) createNode(nv *nodeValidator) *Node { 636 return newNode(clstr, nv) 637} 638 639// Finds a node by name in a list of nodes 640func (clstr *Cluster) findNodeName(list []*Node, name string) bool { 641 for _, node := range list { 642 if node.GetName() == name { 643 return true 644 } 645 } 646 return false 647} 648 649func (clstr *Cluster) addAlias(host *Host, node *Node) { 650 if host != nil && node != nil { 651 clstr.aliases.Update(func(val interface{}) (interface{}, error) { 652 aliases := val.(map[Host]*Node) 653 aliases[*host] = node 654 return aliases, nil 655 }) 656 } 657} 658 659func (clstr *Cluster) removeAlias(alias *Host) { 660 if alias != nil { 661 clstr.aliases.Update(func(val interface{}) (interface{}, error) { 662 aliases := val.(map[Host]*Node) 663 delete(aliases, *alias) 664 return aliases, nil 665 }) 666 } 667} 668 669func (clstr *Cluster) findNodesToRemove(refreshCount int) []*Node { 670 nodes := clstr.GetNodes() 671 672 removeList := []*Node{} 673 674 for _, node := range nodes { 675 if !node.IsActive() { 676 // Inactive nodes must be removed. 677 removeList = append(removeList, node) 678 continue 679 } 680 681 // Single node clusters rely on whether it responded to info requests. 682 if refreshCount == 0 && node.failures.Get() >= 5 { 683 // All node info requests failed and this node had 5 consecutive failures. 684 // Remove node. If no nodes are left, seeds will be tried in next cluster 685 // tend iteration. 686 removeList = append(removeList, node) 687 continue 688 } 689 690 // Two node clusters require at least one successful refresh before removing. 691 if len(nodes) > 1 && refreshCount >= 1 && node.referenceCount.Get() == 0 { 692 // Node is not referenced by other nodes. 693 // Check if node responded to info request. 694 if node.failures.Get() == 0 { 695 // Node is alive, but not referenced by other nodes. Check if mapped. 696 if !clstr.findNodeInPartitionMap(node) { 697 // Node doesn't have any partitions mapped to it. 698 // There is no point in keeping it in the cluster. 699 removeList = append(removeList, node) 700 } 701 } else { 702 // Node not responding. Remove it. 703 removeList = append(removeList, node) 704 } 705 } 706 } 707 708 return removeList 709} 710 711func (clstr *Cluster) findNodeInPartitionMap(filter *Node) bool { 712 partitionMap := clstr.getPartitions() 713 714 for _, partitions := range partitionMap { 715 for _, nodeArray := range partitions.Replicas { 716 for _, node := range nodeArray { 717 // Use reference equality for performance. 718 if node == filter { 719 return true 720 } 721 } 722 } 723 } 724 return false 725} 726 727func (clstr *Cluster) addNodes(nodesToAdd map[string]*Node) { 728 clstr.nodes.Update(func(val interface{}) (interface{}, error) { 729 nodes := val.([]*Node) 730 for _, node := range nodesToAdd { 731 if node != nil && !clstr.findNodeName(nodes, node.name) { 732 Logger.Debug("Adding node %s (%s) to the cluster.", node.name, node.host.String()) 733 nodes = append(nodes, node) 734 } 735 } 736 737 nodesMap := make(map[string]*Node, len(nodes)) 738 nodesAliases := make(map[Host]*Node, len(nodes)) 739 for i := range nodes { 740 nodesMap[nodes[i].name] = nodes[i] 741 742 for _, alias := range nodes[i].GetAliases() { 743 nodesAliases[*alias] = nodes[i] 744 } 745 } 746 747 clstr.nodesMap.Set(nodesMap) 748 clstr.aliases.Set(nodesAliases) 749 750 return nodes, nil 751 }) 752} 753 754func (clstr *Cluster) removeNodes(nodesToRemove []*Node) { 755 756 // There is no need to delete nodes from partitionWriteMap because the nodes 757 // have already been set to inactive. 758 759 // Cleanup node resources. 760 for _, node := range nodesToRemove { 761 // Remove node's aliases from cluster alias set. 762 // Aliases are only used in tend goroutine, so synchronization is not necessary. 763 clstr.aliases.Update(func(val interface{}) (interface{}, error) { 764 aliases := val.(map[Host]*Node) 765 for _, alias := range node.GetAliases() { 766 delete(aliases, *alias) 767 } 768 return aliases, nil 769 }) 770 771 clstr.nodesMap.Update(func(val interface{}) (interface{}, error) { 772 nodesMap := val.(map[string]*Node) 773 delete(nodesMap, node.name) 774 return nodesMap, nil 775 }) 776 777 node.Close() 778 } 779 780 // Remove all nodes at once to avoid copying entire array multiple times. 781 clstr.nodes.Update(func(val interface{}) (interface{}, error) { 782 nodes := val.([]*Node) 783 nlist := make([]*Node, 0, len(nodes)) 784 nlist = append(nlist, nodes...) 785 for i, n := range nlist { 786 for _, ntr := range nodesToRemove { 787 if ntr.Equals(n) { 788 nlist[i] = nil 789 } 790 } 791 } 792 793 newNodes := make([]*Node, 0, len(nlist)) 794 for i := range nlist { 795 if nlist[i] != nil { 796 newNodes = append(newNodes, nlist[i]) 797 } 798 } 799 800 return newNodes, nil 801 }) 802 803} 804 805// IsConnected returns true if cluster has nodes and is not already closed. 806func (clstr *Cluster) IsConnected() bool { 807 // Must copy array reference for copy on write semantics to work. 808 nodeArray := clstr.GetNodes() 809 return (len(nodeArray) > 0) && !clstr.closed.Get() 810} 811 812// GetRandomNode returns a random node on the cluster 813func (clstr *Cluster) GetRandomNode() (*Node, error) { 814 // Must copy array reference for copy on write semantics to work. 815 nodeArray := clstr.GetNodes() 816 length := len(nodeArray) 817 for i := 0; i < length; i++ { 818 // Must handle concurrency with other non-tending goroutines, so nodeIndex is consistent. 819 index := int(atomic.AddUint64(&clstr.nodeIndex, 1) % uint64(length)) 820 node := nodeArray[index] 821 822 if node != nil && node.IsActive() { 823 // Logger.Debug("Node `%s` is active. index=%d", node, index) 824 return node, nil 825 } 826 } 827 828 return nil, NewAerospikeError(INVALID_NODE_ERROR, "Cluster is empty.") 829} 830 831// GetNodes returns a list of all nodes in the cluster 832func (clstr *Cluster) GetNodes() []*Node { 833 // Must copy array reference for copy on write semantics to work. 834 return clstr.nodes.Get().([]*Node) 835} 836 837// GetSeeds returns a list of all seed nodes in the cluster 838func (clstr *Cluster) GetSeeds() []Host { 839 res, _ := clstr.seeds.GetSyncedVia(func(val interface{}) (interface{}, error) { 840 seeds := val.([]*Host) 841 res := make([]Host, 0, len(seeds)) 842 for _, seed := range seeds { 843 res = append(res, *seed) 844 } 845 846 return res, nil 847 }) 848 849 return res.([]Host) 850} 851 852// GetAliases returns a list of all node aliases in the cluster 853func (clstr *Cluster) GetAliases() map[Host]*Node { 854 res, _ := clstr.aliases.GetSyncedVia(func(val interface{}) (interface{}, error) { 855 aliases := val.(map[Host]*Node) 856 res := make(map[Host]*Node, len(aliases)) 857 for h, n := range aliases { 858 res[h] = n 859 } 860 861 return res, nil 862 }) 863 864 return res.(map[Host]*Node) 865} 866 867// GetNodeByName finds a node by name and returns an 868// error if the node is not found. 869func (clstr *Cluster) GetNodeByName(nodeName string) (*Node, error) { 870 node := clstr.findNodeByName(nodeName) 871 872 if node == nil { 873 return nil, NewAerospikeError(INVALID_NODE_ERROR, "Invalid node name"+nodeName) 874 } 875 return node, nil 876} 877 878func (clstr *Cluster) findNodeByName(nodeName string) *Node { 879 // Must copy array reference for copy on write semantics to work. 880 for _, node := range clstr.GetNodes() { 881 if node.GetName() == nodeName { 882 return node 883 } 884 } 885 return nil 886} 887 888// Close closes all cached connections to the cluster nodes 889// and stops the tend goroutine. 890func (clstr *Cluster) Close() { 891 if clstr.closed.CompareAndToggle(false) { 892 // send close signal to maintenance channel 893 close(clstr.tendChannel) 894 895 // wait until tend is over 896 clstr.wgTend.Wait() 897 898 // remove node references from the partition table 899 // to allow GC to work its magic. Leaks otherwise. 900 clstr.getPartitions().cleanup() 901 } 902} 903 904// MigrationInProgress determines if any node in the cluster 905// is participating in a data migration 906func (clstr *Cluster) MigrationInProgress(timeout time.Duration) (res bool, err error) { 907 if timeout <= 0 { 908 timeout = _DEFAULT_TIMEOUT 909 } 910 911 done := make(chan bool, 1) 912 913 go func() { 914 // this function is guaranteed to return after _DEFAULT_TIMEOUT 915 nodes := clstr.GetNodes() 916 for _, node := range nodes { 917 if node.IsActive() { 918 if res, err = node.MigrationInProgress(); res || err != nil { 919 done <- true 920 return 921 } 922 } 923 } 924 925 res, err = false, nil 926 done <- false 927 }() 928 929 dealine := time.After(timeout) 930 for { 931 select { 932 case <-dealine: 933 return false, NewAerospikeError(TIMEOUT) 934 case <-done: 935 return res, err 936 } 937 } 938} 939 940// WaitUntillMigrationIsFinished will block until all 941// migration operations in the cluster all finished. 942func (clstr *Cluster) WaitUntillMigrationIsFinished(timeout time.Duration) (err error) { 943 if timeout <= 0 { 944 timeout = _NO_TIMEOUT 945 } 946 done := make(chan error, 1) 947 948 go func() { 949 // this function is guaranteed to return after timeout 950 // no go routines will be leaked 951 for { 952 if res, err := clstr.MigrationInProgress(timeout); err != nil || !res { 953 done <- err 954 return 955 } 956 } 957 }() 958 959 dealine := time.After(timeout) 960 select { 961 case <-dealine: 962 return NewAerospikeError(TIMEOUT) 963 case err = <-done: 964 return err 965 } 966} 967 968// Password returns the password that is currently used with the cluster. 969func (clstr *Cluster) Password() (res []byte) { 970 pass := clstr.password.Get() 971 if pass != nil { 972 return pass.([]byte) 973 } 974 return nil 975} 976 977func (clstr *Cluster) changePassword(user string, password string, hash []byte) { 978 // change password ONLY if the user is the same 979 if clstr.user == user { 980 clstr.clientPolicy.Password = password 981 clstr.password.Set(hash) 982 } 983} 984 985// ClientPolicy returns the client policy that is currently used with the cluster. 986func (clstr *Cluster) ClientPolicy() (res ClientPolicy) { 987 return clstr.clientPolicy 988} 989 990// WarmUp fills the connection pool with connections for all nodes. 991// This is necessary on startup for high traffic programs. 992// If the count is <= 0, the connection queue will be filled. 993// If the count is more than the size of the pool, the pool will be filled. 994// Note: One connection per node is reserved for tend operations and is not used for transactions. 995func (clstr *Cluster) WarmUp(count int) (int, error) { 996 var g errgroup.Group 997 cnt := NewAtomicInt(0) 998 nodes := clstr.GetNodes() 999 for i := range nodes { 1000 node := nodes[i] 1001 g.Go(func() error { 1002 n, err := node.WarmUp(count) 1003 cnt.AddAndGet(n) 1004 1005 return err 1006 }) 1007 } 1008 1009 err := g.Wait() 1010 return cnt.Get(), err 1011} 1012