1// Copyright 2013-2020 Aerospike, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package aerospike
16
17import (
18	"errors"
19	"fmt"
20	"net"
21	"runtime/debug"
22	"strings"
23	"sync"
24	"sync/atomic"
25	"time"
26
27	. "github.com/aerospike/aerospike-client-go/logger"
28	"golang.org/x/sync/errgroup"
29
30	. "github.com/aerospike/aerospike-client-go/internal/atomic"
31	. "github.com/aerospike/aerospike-client-go/types"
32)
33
34// Cluster encapsulates the aerospike cluster nodes and manages
35// them.
36type Cluster struct {
37	// Initial host nodes specified by user.
38	seeds SyncVal //[]*Host
39
40	// All aliases for all nodes in cluster.
41	// Only accessed within cluster tend goroutine.
42	aliases SyncVal //map[Host]*Node
43
44	// Map of active nodes in cluster.
45	// Only accessed within cluster tend goroutine.
46	nodesMap SyncVal //map[string]*Node
47
48	// Active nodes in cluster.
49	nodes     SyncVal               //[]*Node
50	stats     map[string]*nodeStats //host => stats
51	statsLock sync.Mutex
52
53	// Hints for best node for a partition
54	partitionWriteMap atomic.Value //partitionMap
55
56	clientPolicy        ClientPolicy
57	infoPolicy          InfoPolicy
58	connectionThreshold AtomicInt // number of parallel opening connections
59
60	nodeIndex    uint64 // only used via atomic operations
61	replicaIndex uint64 // only used via atomic operations
62
63	wgTend      sync.WaitGroup
64	tendChannel chan struct{}
65	closed      AtomicBool
66
67	// Aerospike v3.6.0+
68	supportsFloat, supportsBatchIndex, supportsReplicasAll, supportsGeo *AtomicBool
69
70	// User name in UTF-8 encoded bytes.
71	user string
72
73	// Password in hashed format in bytes.
74	password SyncVal // []byte
75}
76
77// NewCluster generates a Cluster instance.
78func NewCluster(policy *ClientPolicy, hosts []*Host) (*Cluster, error) {
79	// Validate the policy params
80	if policy.MinConnectionsPerNode > policy.ConnectionQueueSize {
81		panic("minimum number of connections specified in the ClientPolicy is bigger than total connection pool size")
82	}
83
84	// Default TLS names when TLS enabled.
85	newHosts := make([]*Host, 0, len(hosts))
86	if policy.TlsConfig != nil && !policy.TlsConfig.InsecureSkipVerify {
87		useClusterName := len(policy.ClusterName) > 0
88
89		for _, host := range hosts {
90			nh := *host
91			if nh.TLSName == "" {
92				if useClusterName {
93					nh.TLSName = policy.ClusterName
94				} else {
95					nh.TLSName = host.Name
96				}
97			}
98			newHosts = append(newHosts, &nh)
99		}
100		hosts = newHosts
101	}
102
103	newCluster := &Cluster{
104		clientPolicy: *policy,
105		infoPolicy:   InfoPolicy{Timeout: policy.Timeout},
106		tendChannel:  make(chan struct{}),
107
108		seeds:    *NewSyncVal(hosts),
109		aliases:  *NewSyncVal(make(map[Host]*Node)),
110		nodesMap: *NewSyncVal(make(map[string]*Node)),
111		nodes:    *NewSyncVal([]*Node{}),
112		stats:    map[string]*nodeStats{},
113
114		password: *NewSyncVal(nil),
115
116		supportsFloat:       NewAtomicBool(false),
117		supportsBatchIndex:  NewAtomicBool(false),
118		supportsReplicasAll: NewAtomicBool(false),
119		supportsGeo:         NewAtomicBool(false),
120	}
121
122	newCluster.partitionWriteMap.Store(make(partitionMap))
123
124	// setup auth info for cluster
125	if policy.RequiresAuthentication() {
126		if policy.AuthMode == AuthModeExternal && policy.TlsConfig == nil {
127			return nil, errors.New("External Authentication requires TLS configuration to be set, because it sends clear password on the wire.")
128		}
129
130		newCluster.user = policy.User
131		hashedPass, err := hashPassword(policy.Password)
132		if err != nil {
133			return nil, err
134		}
135		newCluster.password = *NewSyncVal(hashedPass)
136	}
137
138	// try to seed connections for first use
139	err := newCluster.waitTillStabilized()
140
141	// apply policy rules
142	if policy.FailIfNotConnected && !newCluster.IsConnected() {
143		if err != nil {
144			return nil, err
145		}
146		return nil, fmt.Errorf("Failed to connect to host(s): %v. The network connection(s) to cluster nodes may have timed out, or the cluster may be in a state of flux.", hosts)
147	}
148
149	// start up cluster maintenance go routine
150	newCluster.wgTend.Add(1)
151	go newCluster.clusterBoss(&newCluster.clientPolicy)
152
153	if err == nil {
154		Logger.Debug("New cluster initialized and ready to be used...")
155	} else {
156		Logger.Error("New cluster was not initialized successfully, but the client will keep trying to connect to the database. Error: %s", err.Error())
157	}
158
159	return newCluster, err
160}
161
162// String implements the stringer interface
163func (clstr *Cluster) String() string {
164	return fmt.Sprintf("%v", clstr.GetNodes())
165}
166
167// Maintains the cluster on intervals.
168// All clean up code for cluster is here as well.
169func (clstr *Cluster) clusterBoss(policy *ClientPolicy) {
170	Logger.Info("Starting the cluster tend goroutine...")
171
172	defer func() {
173		if r := recover(); r != nil {
174			Logger.Error("Cluster tend goroutine crashed: %s", debug.Stack())
175			go clstr.clusterBoss(&clstr.clientPolicy)
176		}
177	}()
178
179	defer clstr.wgTend.Done()
180
181	tendInterval := policy.TendInterval
182	if tendInterval <= 10*time.Millisecond {
183		tendInterval = 10 * time.Millisecond
184	}
185
186Loop:
187	for {
188		select {
189		case <-clstr.tendChannel:
190			// tend channel closed
191			Logger.Debug("Tend channel closed. Shutting down the cluster...")
192			break Loop
193		case <-time.After(tendInterval):
194			tm := time.Now()
195			if err := clstr.tend(); err != nil {
196				Logger.Warn(err.Error())
197			}
198
199			// Tending took longer than requested tend interval.
200			// Tending is too slow for the cluster, and may be falling behind scheule.
201			if tendDuration := time.Since(tm); tendDuration > clstr.clientPolicy.TendInterval {
202				Logger.Warn("Tending took %s, while your requested ClientPolicy.TendInterval is %s. Tends are slower than the interval, and may be falling behind the changes in the cluster.", tendDuration, clstr.clientPolicy.TendInterval)
203			}
204		}
205	}
206
207	// cleanup code goes here
208	// close the nodes
209	nodeArray := clstr.GetNodes()
210	for _, node := range nodeArray {
211		node.Close()
212	}
213}
214
215// AddSeeds adds new hosts to the cluster.
216// They will be added to the cluster on next tend call.
217func (clstr *Cluster) AddSeeds(hosts []*Host) {
218	clstr.seeds.Update(func(val interface{}) (interface{}, error) {
219		seeds := val.([]*Host)
220		seeds = append(seeds, hosts...)
221		return seeds, nil
222	})
223}
224
225// Updates cluster state
226func (clstr *Cluster) tend() error {
227
228	nodes := clstr.GetNodes()
229	nodeCountBeforeTend := len(nodes)
230
231	// All node additions/deletions are performed in tend goroutine.
232	// If active nodes don't exist, seed cluster.
233	if len(nodes) == 0 {
234		Logger.Info("No connections available; seeding...")
235		if newNodesFound, err := clstr.seedNodes(); !newNodesFound {
236			return err
237		}
238
239		// refresh nodes list after seeding
240		nodes = clstr.GetNodes()
241	}
242
243	peers := newPeers(len(nodes)+16, 16)
244
245	floatSupport := true
246	batchIndexSupport := true
247	geoSupport := true
248
249	for _, node := range nodes {
250		// Clear node reference counts.
251		node.referenceCount.Set(0)
252		node.partitionChanged.Set(false)
253		if !node.supportsPeers.Get() {
254			peers.usePeers.Set(false)
255		}
256	}
257
258	wg := sync.WaitGroup{}
259	wg.Add(len(nodes))
260	for _, node := range nodes {
261		go func(node *Node) {
262			defer wg.Done()
263			if err := node.Refresh(peers); err != nil {
264				Logger.Debug("Error occurred while refreshing node: %s", node.String())
265			}
266		}(node)
267	}
268	wg.Wait()
269
270	// Refresh peers when necessary.
271	if peers.usePeers.Get() && (peers.genChanged.Get() || len(peers.peers()) != nodeCountBeforeTend) {
272		// Refresh peers for all nodes that responded the first time even if only one node's peers changed.
273		peers.refreshCount.Set(0)
274
275		wg.Add(len(nodes))
276		for _, node := range nodes {
277			go func(node *Node) {
278				defer wg.Done()
279				node.refreshPeers(peers)
280			}(node)
281		}
282		wg.Wait()
283	}
284
285	var partitionMap partitionMap
286
287	// Use the following function to allocate memory for the partitionMap on demand.
288	// This will prevent the allocation when the cluster is stable, and make tend a bit faster.
289	pmlock := new(sync.Mutex)
290	setPartitionMap := func(l *sync.Mutex) {
291		l.Lock()
292		defer l.Unlock()
293		if partitionMap == nil {
294			partitionMap = clstr.getPartitions().clone()
295		}
296	}
297
298	// find the first host that connects
299	for _, _peer := range peers.peers() {
300		if clstr.peerExists(peers, _peer.nodeName) {
301			// Node already exists. Do not even try to connect to hosts.
302			continue
303		}
304
305		wg.Add(1)
306		go func(__peer *peer) {
307			defer wg.Done()
308			for _, host := range __peer.hosts {
309				// attempt connection to the host
310				nv := nodeValidator{}
311				if err := nv.validateNode(clstr, host); err != nil {
312					Logger.Warn("Add node `%s` failed: `%s`", host, err)
313					continue
314				}
315
316				// Must look for new node name in the unlikely event that node names do not agree.
317				if __peer.nodeName != nv.name {
318					Logger.Warn("Peer node `%s` is different than actual node `%s` for host `%s`", __peer.nodeName, nv.name, host)
319				}
320
321				if clstr.peerExists(peers, nv.name) {
322					// Node already exists. Do not even try to connect to hosts.
323					break
324				}
325
326				// Create new node.
327				node := clstr.createNode(&nv)
328				peers.addNode(nv.name, node)
329				setPartitionMap(pmlock)
330				node.refreshPartitions(peers, partitionMap)
331				break
332			}
333		}(_peer)
334	}
335
336	// Refresh partition map when necessary.
337	wg.Add(len(nodes))
338	for _, node := range nodes {
339		go func(node *Node) {
340			defer wg.Done()
341			if node.partitionChanged.Get() {
342				setPartitionMap(pmlock)
343				node.refreshPartitions(peers, partitionMap)
344			}
345		}(node)
346	}
347
348	// This waits for the both steps above
349	wg.Wait()
350
351	if peers.genChanged.Get() || !peers.usePeers.Get() {
352		// Handle nodes changes determined from refreshes.
353		removeList := clstr.findNodesToRemove(peers.refreshCount.Get())
354
355		// Remove nodes in a batch.
356		if len(removeList) > 0 {
357			for _, n := range removeList {
358				Logger.Debug("The following nodes will be removed: %s", n)
359			}
360			clstr.removeNodes(removeList)
361		}
362
363		clstr.aggregateNodestats(removeList)
364	}
365
366	// Add nodes in a batch.
367	if len(peers.nodes()) > 0 {
368		clstr.addNodes(peers.nodes())
369	}
370
371	if !floatSupport {
372		Logger.Warn("Some cluster nodes do not support float type. Disabling native float support in the client library...")
373	}
374
375	// set the cluster supported features
376	clstr.supportsFloat.Set(floatSupport)
377	clstr.supportsBatchIndex.Set(batchIndexSupport)
378	clstr.supportsGeo.Set(geoSupport)
379
380	// update all partitions in one go
381	updatePartitionMap := false
382	for _, node := range clstr.GetNodes() {
383		if node.partitionChanged.Get() {
384			updatePartitionMap = true
385			break
386		}
387	}
388
389	if updatePartitionMap {
390		clstr.setPartitions(partitionMap)
391	}
392
393	if err := clstr.getPartitions().validate(); err != nil {
394		Logger.Debug("Error validating the cluster partition map after tend: %s", err.Error())
395	}
396
397	// only log if node count is changed
398	if nodeCountBeforeTend != len(clstr.GetNodes()) {
399		Logger.Info("Tend finished. Live node count changes from %d to %d", nodeCountBeforeTend, len(clstr.GetNodes()))
400	}
401
402	clstr.aggregateNodestats(clstr.GetNodes())
403
404	return nil
405}
406
407func (clstr *Cluster) aggregateNodestats(nodeList []*Node) {
408	// update stats
409	clstr.statsLock.Lock()
410	defer clstr.statsLock.Unlock()
411
412	for _, node := range nodeList {
413		h := node.host.String()
414		if stats, exists := clstr.stats[h]; exists {
415			stats.aggregate(node.stats.getAndReset())
416		} else {
417			clstr.stats[h] = node.stats.getAndReset()
418		}
419	}
420}
421
422func (clstr *Cluster) statsCopy() map[string]nodeStats {
423	clstr.statsLock.Lock()
424	defer clstr.statsLock.Unlock()
425
426	res := make(map[string]nodeStats, len(clstr.stats))
427	for _, node := range clstr.GetNodes() {
428		h := node.host.String()
429		if stats, exists := clstr.stats[h]; exists {
430			statsCopy := stats.clone()
431			statsCopy.ConnectionsOpen = int64(node.connectionCount.Get())
432			res[h] = statsCopy
433		}
434	}
435
436	// stats for nodes which do not exist anymore
437	for h, stats := range clstr.stats {
438		if _, exists := res[h]; !exists {
439			stats.ConnectionsOpen = 0
440			res[h] = stats.clone()
441		}
442	}
443
444	return res
445}
446
447func (clstr *Cluster) peerExists(peers *peers, nodeName string) bool {
448	node := clstr.findNodeByName(nodeName)
449	if node != nil {
450		node.referenceCount.IncrementAndGet()
451		return true
452	}
453
454	node = peers.nodeByName(nodeName)
455	if node != nil {
456		node.referenceCount.IncrementAndGet()
457		return true
458	}
459
460	return false
461}
462
463// Tend the cluster until it has stabilized and return control.
464// This helps avoid initial database request timeout issues when
465// a large number of threads are initiated at client startup.
466//
467// If the cluster has not stabilized by the timeout, return
468// control as well.  Do not return an error since future
469// database requests may still succeed.
470func (clstr *Cluster) waitTillStabilized() error {
471	count := -1
472
473	doneCh := make(chan error, 10)
474
475	// will run until the cluster is stabilized
476	go func() {
477		var err error
478		for {
479			if err = clstr.tend(); err != nil {
480				if aerr, ok := err.(AerospikeError); ok {
481					switch aerr.ResultCode() {
482					case NOT_AUTHENTICATED, CLUSTER_NAME_MISMATCH_ERROR:
483						doneCh <- err
484						return
485					}
486				}
487				Logger.Warn(err.Error())
488			}
489
490			// // if there are no errors in connecting to the cluster, then validate the partition table
491			// if err == nil {
492			// 	err = clstr.getPartitions().validate()
493			// }
494
495			// Check to see if cluster has changed since the last Tend().
496			// If not, assume cluster has stabilized and return.
497			if count == len(clstr.GetNodes()) {
498				break
499			}
500
501			time.Sleep(time.Millisecond)
502
503			count = len(clstr.GetNodes())
504		}
505		doneCh <- err
506	}()
507
508	select {
509	case <-time.After(clstr.clientPolicy.Timeout):
510		if clstr.clientPolicy.FailIfNotConnected {
511			clstr.Close()
512		}
513		return errors.New("Connecting to the cluster timed out.")
514	case err := <-doneCh:
515		if err != nil && clstr.clientPolicy.FailIfNotConnected {
516			clstr.Close()
517		}
518		return err
519	}
520}
521
522func (clstr *Cluster) findAlias(alias *Host) *Node {
523	res, _ := clstr.aliases.GetSyncedVia(func(val interface{}) (interface{}, error) {
524		aliases := val.(map[Host]*Node)
525		return aliases[*alias], nil
526	})
527
528	return res.(*Node)
529}
530
531func (clstr *Cluster) setPartitions(partMap partitionMap) {
532	if err := partMap.validate(); err != nil {
533		Logger.Error("Partition map error: %s.", err.Error())
534	}
535
536	clstr.partitionWriteMap.Store(partMap)
537}
538
539func (clstr *Cluster) getPartitions() partitionMap {
540	return clstr.partitionWriteMap.Load().(partitionMap)
541}
542
543// discoverSeeds will lookup the seed hosts and convert seed hosts
544// to IP addresses.
545func discoverSeedIPs(seeds []*Host) (res []*Host) {
546	for _, seed := range seeds {
547		addresses, err := net.LookupHost(seed.Name)
548		if err != nil {
549			continue
550		}
551
552		for i := range addresses {
553			h := *seed
554			h.Name = addresses[i]
555			res = append(res, &h)
556		}
557	}
558
559	return res
560}
561
562// Adds seeds to the cluster
563func (clstr *Cluster) seedNodes() (bool, error) {
564	// Must copy array reference for copy on write semantics to work.
565	seedArrayIfc, _ := clstr.seeds.GetSyncedVia(func(val interface{}) (interface{}, error) {
566		seeds := val.([]*Host)
567		seeds_copy := make([]*Host, len(seeds))
568		copy(seeds_copy, seeds)
569
570		return seeds_copy, nil
571	})
572
573	// discover seed IPs from DNS or Load Balancers
574	seedArray := discoverSeedIPs(seedArrayIfc.([]*Host))
575
576	successChan := make(chan struct{}, len(seedArray))
577	errChan := make(chan error, len(seedArray))
578
579	Logger.Info("Seeding the cluster. Seeds count: %d", len(seedArray))
580
581	// Add all nodes at once to avoid copying entire array multiple times.
582	for i, seed := range seedArray {
583		go func(index int, seed *Host) {
584			nodesToAdd := make(nodesToAddT, 128)
585			nv := nodeValidator{}
586			err := nv.seedNodes(clstr, seed, nodesToAdd)
587			if err != nil {
588				Logger.Warn("Seed %s failed: %s", seed.String(), err.Error())
589				errChan <- err
590				return
591			}
592			clstr.addNodes(nodesToAdd)
593			successChan <- struct{}{}
594		}(i, seed)
595	}
596
597	errorList := make([]error, 0, len(seedArray))
598	seedCount := len(seedArray)
599L:
600	for {
601		select {
602		case err := <-errChan:
603			errorList = append(errorList, err)
604			seedCount--
605			if seedCount <= 0 {
606				break L
607			}
608		case <-successChan:
609			// even one seed is enough
610			return true, nil
611		case <-time.After(clstr.clientPolicy.Timeout):
612			// time is up, no seeds found
613			break L
614		}
615	}
616
617	var errStrs []string
618	for _, err := range errorList {
619		if err != nil {
620			if aerr, ok := err.(AerospikeError); ok {
621				switch aerr.ResultCode() {
622				case NOT_AUTHENTICATED:
623					return false, NewAerospikeError(NOT_AUTHENTICATED)
624				case CLUSTER_NAME_MISMATCH_ERROR:
625					return false, aerr
626				}
627			}
628			errStrs = append(errStrs, err.Error())
629		}
630	}
631
632	return false, NewAerospikeError(INVALID_NODE_ERROR, "Failed to connect to hosts:"+strings.Join(errStrs, "\n"))
633}
634
635func (clstr *Cluster) createNode(nv *nodeValidator) *Node {
636	return newNode(clstr, nv)
637}
638
639// Finds a node by name in a list of nodes
640func (clstr *Cluster) findNodeName(list []*Node, name string) bool {
641	for _, node := range list {
642		if node.GetName() == name {
643			return true
644		}
645	}
646	return false
647}
648
649func (clstr *Cluster) addAlias(host *Host, node *Node) {
650	if host != nil && node != nil {
651		clstr.aliases.Update(func(val interface{}) (interface{}, error) {
652			aliases := val.(map[Host]*Node)
653			aliases[*host] = node
654			return aliases, nil
655		})
656	}
657}
658
659func (clstr *Cluster) removeAlias(alias *Host) {
660	if alias != nil {
661		clstr.aliases.Update(func(val interface{}) (interface{}, error) {
662			aliases := val.(map[Host]*Node)
663			delete(aliases, *alias)
664			return aliases, nil
665		})
666	}
667}
668
669func (clstr *Cluster) findNodesToRemove(refreshCount int) []*Node {
670	nodes := clstr.GetNodes()
671
672	removeList := []*Node{}
673
674	for _, node := range nodes {
675		if !node.IsActive() {
676			// Inactive nodes must be removed.
677			removeList = append(removeList, node)
678			continue
679		}
680
681		// Single node clusters rely on whether it responded to info requests.
682		if refreshCount == 0 && node.failures.Get() >= 5 {
683			// All node info requests failed and this node had 5 consecutive failures.
684			// Remove node.  If no nodes are left, seeds will be tried in next cluster
685			// tend iteration.
686			removeList = append(removeList, node)
687			continue
688		}
689
690		// Two node clusters require at least one successful refresh before removing.
691		if len(nodes) > 1 && refreshCount >= 1 && node.referenceCount.Get() == 0 {
692			// Node is not referenced by other nodes.
693			// Check if node responded to info request.
694			if node.failures.Get() == 0 {
695				// Node is alive, but not referenced by other nodes.  Check if mapped.
696				if !clstr.findNodeInPartitionMap(node) {
697					// Node doesn't have any partitions mapped to it.
698					// There is no point in keeping it in the cluster.
699					removeList = append(removeList, node)
700				}
701			} else {
702				// Node not responding. Remove it.
703				removeList = append(removeList, node)
704			}
705		}
706	}
707
708	return removeList
709}
710
711func (clstr *Cluster) findNodeInPartitionMap(filter *Node) bool {
712	partitionMap := clstr.getPartitions()
713
714	for _, partitions := range partitionMap {
715		for _, nodeArray := range partitions.Replicas {
716			for _, node := range nodeArray {
717				// Use reference equality for performance.
718				if node == filter {
719					return true
720				}
721			}
722		}
723	}
724	return false
725}
726
727func (clstr *Cluster) addNodes(nodesToAdd map[string]*Node) {
728	clstr.nodes.Update(func(val interface{}) (interface{}, error) {
729		nodes := val.([]*Node)
730		for _, node := range nodesToAdd {
731			if node != nil && !clstr.findNodeName(nodes, node.name) {
732				Logger.Debug("Adding node %s (%s) to the cluster.", node.name, node.host.String())
733				nodes = append(nodes, node)
734			}
735		}
736
737		nodesMap := make(map[string]*Node, len(nodes))
738		nodesAliases := make(map[Host]*Node, len(nodes))
739		for i := range nodes {
740			nodesMap[nodes[i].name] = nodes[i]
741
742			for _, alias := range nodes[i].GetAliases() {
743				nodesAliases[*alias] = nodes[i]
744			}
745		}
746
747		clstr.nodesMap.Set(nodesMap)
748		clstr.aliases.Set(nodesAliases)
749
750		return nodes, nil
751	})
752}
753
754func (clstr *Cluster) removeNodes(nodesToRemove []*Node) {
755
756	// There is no need to delete nodes from partitionWriteMap because the nodes
757	// have already been set to inactive.
758
759	// Cleanup node resources.
760	for _, node := range nodesToRemove {
761		// Remove node's aliases from cluster alias set.
762		// Aliases are only used in tend goroutine, so synchronization is not necessary.
763		clstr.aliases.Update(func(val interface{}) (interface{}, error) {
764			aliases := val.(map[Host]*Node)
765			for _, alias := range node.GetAliases() {
766				delete(aliases, *alias)
767			}
768			return aliases, nil
769		})
770
771		clstr.nodesMap.Update(func(val interface{}) (interface{}, error) {
772			nodesMap := val.(map[string]*Node)
773			delete(nodesMap, node.name)
774			return nodesMap, nil
775		})
776
777		node.Close()
778	}
779
780	// Remove all nodes at once to avoid copying entire array multiple times.
781	clstr.nodes.Update(func(val interface{}) (interface{}, error) {
782		nodes := val.([]*Node)
783		nlist := make([]*Node, 0, len(nodes))
784		nlist = append(nlist, nodes...)
785		for i, n := range nlist {
786			for _, ntr := range nodesToRemove {
787				if ntr.Equals(n) {
788					nlist[i] = nil
789				}
790			}
791		}
792
793		newNodes := make([]*Node, 0, len(nlist))
794		for i := range nlist {
795			if nlist[i] != nil {
796				newNodes = append(newNodes, nlist[i])
797			}
798		}
799
800		return newNodes, nil
801	})
802
803}
804
805// IsConnected returns true if cluster has nodes and is not already closed.
806func (clstr *Cluster) IsConnected() bool {
807	// Must copy array reference for copy on write semantics to work.
808	nodeArray := clstr.GetNodes()
809	return (len(nodeArray) > 0) && !clstr.closed.Get()
810}
811
812// GetRandomNode returns a random node on the cluster
813func (clstr *Cluster) GetRandomNode() (*Node, error) {
814	// Must copy array reference for copy on write semantics to work.
815	nodeArray := clstr.GetNodes()
816	length := len(nodeArray)
817	for i := 0; i < length; i++ {
818		// Must handle concurrency with other non-tending goroutines, so nodeIndex is consistent.
819		index := int(atomic.AddUint64(&clstr.nodeIndex, 1) % uint64(length))
820		node := nodeArray[index]
821
822		if node != nil && node.IsActive() {
823			// Logger.Debug("Node `%s` is active. index=%d", node, index)
824			return node, nil
825		}
826	}
827
828	return nil, NewAerospikeError(INVALID_NODE_ERROR, "Cluster is empty.")
829}
830
831// GetNodes returns a list of all nodes in the cluster
832func (clstr *Cluster) GetNodes() []*Node {
833	// Must copy array reference for copy on write semantics to work.
834	return clstr.nodes.Get().([]*Node)
835}
836
837// GetSeeds returns a list of all seed nodes in the cluster
838func (clstr *Cluster) GetSeeds() []Host {
839	res, _ := clstr.seeds.GetSyncedVia(func(val interface{}) (interface{}, error) {
840		seeds := val.([]*Host)
841		res := make([]Host, 0, len(seeds))
842		for _, seed := range seeds {
843			res = append(res, *seed)
844		}
845
846		return res, nil
847	})
848
849	return res.([]Host)
850}
851
852// GetAliases returns a list of all node aliases in the cluster
853func (clstr *Cluster) GetAliases() map[Host]*Node {
854	res, _ := clstr.aliases.GetSyncedVia(func(val interface{}) (interface{}, error) {
855		aliases := val.(map[Host]*Node)
856		res := make(map[Host]*Node, len(aliases))
857		for h, n := range aliases {
858			res[h] = n
859		}
860
861		return res, nil
862	})
863
864	return res.(map[Host]*Node)
865}
866
867// GetNodeByName finds a node by name and returns an
868// error if the node is not found.
869func (clstr *Cluster) GetNodeByName(nodeName string) (*Node, error) {
870	node := clstr.findNodeByName(nodeName)
871
872	if node == nil {
873		return nil, NewAerospikeError(INVALID_NODE_ERROR, "Invalid node name"+nodeName)
874	}
875	return node, nil
876}
877
878func (clstr *Cluster) findNodeByName(nodeName string) *Node {
879	// Must copy array reference for copy on write semantics to work.
880	for _, node := range clstr.GetNodes() {
881		if node.GetName() == nodeName {
882			return node
883		}
884	}
885	return nil
886}
887
888// Close closes all cached connections to the cluster nodes
889// and stops the tend goroutine.
890func (clstr *Cluster) Close() {
891	if clstr.closed.CompareAndToggle(false) {
892		// send close signal to maintenance channel
893		close(clstr.tendChannel)
894
895		// wait until tend is over
896		clstr.wgTend.Wait()
897
898		// remove node references from the partition table
899		// to allow GC to work its magic. Leaks otherwise.
900		clstr.getPartitions().cleanup()
901	}
902}
903
904// MigrationInProgress determines if any node in the cluster
905// is participating in a data migration
906func (clstr *Cluster) MigrationInProgress(timeout time.Duration) (res bool, err error) {
907	if timeout <= 0 {
908		timeout = _DEFAULT_TIMEOUT
909	}
910
911	done := make(chan bool, 1)
912
913	go func() {
914		// this function is guaranteed to return after _DEFAULT_TIMEOUT
915		nodes := clstr.GetNodes()
916		for _, node := range nodes {
917			if node.IsActive() {
918				if res, err = node.MigrationInProgress(); res || err != nil {
919					done <- true
920					return
921				}
922			}
923		}
924
925		res, err = false, nil
926		done <- false
927	}()
928
929	dealine := time.After(timeout)
930	for {
931		select {
932		case <-dealine:
933			return false, NewAerospikeError(TIMEOUT)
934		case <-done:
935			return res, err
936		}
937	}
938}
939
940// WaitUntillMigrationIsFinished will block until all
941// migration operations in the cluster all finished.
942func (clstr *Cluster) WaitUntillMigrationIsFinished(timeout time.Duration) (err error) {
943	if timeout <= 0 {
944		timeout = _NO_TIMEOUT
945	}
946	done := make(chan error, 1)
947
948	go func() {
949		// this function is guaranteed to return after timeout
950		// no go routines will be leaked
951		for {
952			if res, err := clstr.MigrationInProgress(timeout); err != nil || !res {
953				done <- err
954				return
955			}
956		}
957	}()
958
959	dealine := time.After(timeout)
960	select {
961	case <-dealine:
962		return NewAerospikeError(TIMEOUT)
963	case err = <-done:
964		return err
965	}
966}
967
968// Password returns the password that is currently used with the cluster.
969func (clstr *Cluster) Password() (res []byte) {
970	pass := clstr.password.Get()
971	if pass != nil {
972		return pass.([]byte)
973	}
974	return nil
975}
976
977func (clstr *Cluster) changePassword(user string, password string, hash []byte) {
978	// change password ONLY if the user is the same
979	if clstr.user == user {
980		clstr.clientPolicy.Password = password
981		clstr.password.Set(hash)
982	}
983}
984
985// ClientPolicy returns the client policy that is currently used with the cluster.
986func (clstr *Cluster) ClientPolicy() (res ClientPolicy) {
987	return clstr.clientPolicy
988}
989
990// WarmUp fills the connection pool with connections for all nodes.
991// This is necessary on startup for high traffic programs.
992// If the count is <= 0, the connection queue will be filled.
993// If the count is more than the size of the pool, the pool will be filled.
994// Note: One connection per node is reserved for tend operations and is not used for transactions.
995func (clstr *Cluster) WarmUp(count int) (int, error) {
996	var g errgroup.Group
997	cnt := NewAtomicInt(0)
998	nodes := clstr.GetNodes()
999	for i := range nodes {
1000		node := nodes[i]
1001		g.Go(func() error {
1002			n, err := node.WarmUp(count)
1003			cnt.AddAndGet(n)
1004
1005			return err
1006		})
1007	}
1008
1009	err := g.Wait()
1010	return cnt.Get(), err
1011}
1012