1// Copyright 2013-2020 Aerospike, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package aerospike
16
17import (
18	"bufio"
19	"io"
20	"strconv"
21	"strings"
22	"sync"
23	"sync/atomic"
24	"time"
25
26	"golang.org/x/sync/errgroup"
27
28	. "github.com/aerospike/aerospike-client-go/internal/atomic"
29	. "github.com/aerospike/aerospike-client-go/logger"
30	. "github.com/aerospike/aerospike-client-go/types"
31)
32
33const (
34	_PARTITIONS = 4096
35)
36
37// Node represents an Aerospike Database Server Node
38type Node struct {
39	cluster            *Cluster
40	name               string
41	host               *Host
42	aliases            atomic.Value //[]*Host
43	stats              nodeStats
44	_sessionToken      atomic.Value //[]byte
45	_sessionExpiration atomic.Value //time.Time
46
47	racks atomic.Value //map[string]int
48
49	// tendConn reserves a connection for tend so that it won't have to
50	// wait in queue for connections, since that will cause starvation
51	// and the node being dropped under load.
52	tendConn     *Connection
53	tendConnLock sync.Mutex // All uses of tend connection should be synchronized
54
55	peersGeneration AtomicInt
56	peersCount      AtomicInt
57
58	connections     connectionHeap
59	connectionCount AtomicInt
60	health          AtomicInt //AtomicInteger
61
62	partitionGeneration AtomicInt
63	referenceCount      AtomicInt
64	failures            AtomicInt
65	partitionChanged    AtomicBool
66
67	active AtomicBool
68
69	supportsFloat, supportsBatchIndex, supportsReplicas, supportsGeo, supportsPeers, supportsLUTNow, supportsTruncateNamespace, supportsClusterStable, supportsBitwiseOps AtomicBool
70}
71
72// NewNode initializes a server node with connection parameters.
73func newNode(cluster *Cluster, nv *nodeValidator) *Node {
74	newNode := &Node{
75		cluster: cluster,
76		name:    nv.name,
77		// address: nv.primaryAddress,
78		host: nv.primaryHost,
79
80		// Assign host to first IP alias because the server identifies nodes
81		// by IP address (not hostname).
82		connections:         *newConnectionHeap(cluster.clientPolicy.MinConnectionsPerNode, cluster.clientPolicy.ConnectionQueueSize),
83		connectionCount:     *NewAtomicInt(0),
84		peersGeneration:     *NewAtomicInt(-1),
85		partitionGeneration: *NewAtomicInt(-2),
86		referenceCount:      *NewAtomicInt(0),
87		failures:            *NewAtomicInt(0),
88		active:              *NewAtomicBool(true),
89		partitionChanged:    *NewAtomicBool(false),
90
91		supportsFloat:             *NewAtomicBool(nv.supportsFloat),
92		supportsBatchIndex:        *NewAtomicBool(nv.supportsBatchIndex),
93		supportsReplicas:          *NewAtomicBool(nv.supportsReplicas),
94		supportsGeo:               *NewAtomicBool(nv.supportsGeo),
95		supportsPeers:             *NewAtomicBool(nv.supportsPeers),
96		supportsLUTNow:            *NewAtomicBool(nv.supportsLUTNow),
97		supportsTruncateNamespace: *NewAtomicBool(nv.supportsTruncateNamespace),
98		supportsClusterStable:     *NewAtomicBool(nv.supportsClusterStable),
99		supportsBitwiseOps:        *NewAtomicBool(nv.supportsBitwiseOps),
100	}
101
102	newNode.aliases.Store(nv.aliases)
103	newNode._sessionToken.Store(nv.sessionToken)
104	newNode.racks.Store(map[string]int{})
105
106	// this will reset to zero on first aggregation on the cluster,
107	// therefore will only be counted once.
108	atomic.AddInt64(&newNode.stats.NodeAdded, 1)
109
110	return newNode
111}
112
113// Refresh requests current status from server node, and updates node with the result.
114func (nd *Node) Refresh(peers *peers) error {
115	if !nd.active.Get() {
116		return nil
117	}
118
119	atomic.AddInt64(&nd.stats.TendsTotal, 1)
120
121	// Close idleConnections
122	defer nd.dropIdleConnections()
123
124	nd.referenceCount.Set(0)
125
126	var infoMap map[string]string
127	var err error
128	if peers.usePeers.Get() {
129		commands := []string{"node", "peers-generation", "partition-generation"}
130		if nd.cluster.clientPolicy.RackAware {
131			commands = append(commands, "racks:")
132		}
133
134		infoMap, err = nd.RequestInfo(&nd.cluster.infoPolicy, commands...)
135		if err != nil {
136			nd.refreshFailed(err)
137			return err
138		}
139
140		if err := nd.verifyNodeName(infoMap); err != nil {
141			nd.refreshFailed(err)
142			return err
143		}
144
145		if err := nd.verifyPeersGeneration(infoMap, peers); err != nil {
146			nd.refreshFailed(err)
147			return err
148		}
149
150		if err := nd.verifyPartitionGeneration(infoMap); err != nil {
151			nd.refreshFailed(err)
152			return err
153		}
154	} else {
155		commands := []string{"node", "partition-generation", nd.cluster.clientPolicy.servicesString()}
156		if nd.cluster.clientPolicy.RackAware {
157			commands = append(commands, "racks:")
158		}
159
160		infoMap, err = nd.RequestInfo(&nd.cluster.infoPolicy, commands...)
161		if err != nil {
162			nd.refreshFailed(err)
163			return err
164		}
165
166		if err := nd.verifyNodeName(infoMap); err != nil {
167			nd.refreshFailed(err)
168			return err
169		}
170
171		if err = nd.verifyPartitionGeneration(infoMap); err != nil {
172			nd.refreshFailed(err)
173			return err
174		}
175
176		if err = nd.addFriends(infoMap, peers); err != nil {
177			nd.refreshFailed(err)
178			return err
179		}
180	}
181
182	if err := nd.updateRackInfo(infoMap); err != nil {
183		// Update rack info should fail if the feature is not supported on the server
184		if aerr, ok := err.(AerospikeError); ok && aerr.ResultCode() == UNSUPPORTED_FEATURE {
185			nd.refreshFailed(err)
186			return err
187		}
188		// Should not fail in other cases
189		Logger.Warn("Updating node rack info failed with error: %s (racks: `%s`)", err, infoMap["racks:"])
190	}
191
192	nd.failures.Set(0)
193	peers.refreshCount.IncrementAndGet()
194	nd.referenceCount.IncrementAndGet()
195	atomic.AddInt64(&nd.stats.TendsSuccessful, 1)
196
197	if err := nd.refreshSessionToken(); err != nil {
198		Logger.Error("Error refreshing session token: %s", err.Error())
199	}
200
201	if _, err := nd.fillMinConns(); err != nil {
202		Logger.Error("Error filling up the connection queue to the minimum required")
203	}
204
205	return nil
206}
207
208// refreshSessionToken refreshes the session token if it has been expired
209func (nd *Node) refreshSessionToken() error {
210	// no session token to refresh
211	if !nd.cluster.clientPolicy.RequiresAuthentication() || nd.cluster.clientPolicy.AuthMode != AuthModeExternal {
212		return nil
213	}
214
215	var deadline time.Time
216	deadlineIfc := nd._sessionExpiration.Load()
217	if deadlineIfc != nil {
218		deadline = deadlineIfc.(time.Time)
219	}
220
221	if deadline.IsZero() || time.Now().Before(deadline) {
222		return nil
223	}
224
225	nd.tendConnLock.Lock()
226	defer nd.tendConnLock.Unlock()
227
228	if err := nd.initTendConn(nd.cluster.clientPolicy.LoginTimeout); err != nil {
229		return err
230	}
231
232	command := newLoginCommand(nd.tendConn.dataBuffer)
233	if err := command.login(&nd.cluster.clientPolicy, nd.tendConn, nd.cluster.Password()); err != nil {
234		// Socket not authenticated. Do not put back into pool.
235		nd.tendConn.Close()
236		return err
237	}
238
239	nd._sessionToken.Store(command.SessionToken)
240	nd._sessionExpiration.Store(command.SessionExpiration)
241
242	return nil
243}
244
245func (nd *Node) updateRackInfo(infoMap map[string]string) error {
246	if !nd.cluster.clientPolicy.RackAware {
247		return nil
248	}
249
250	// Do not raise an error if the server does not support rackaware
251	if strings.HasPrefix(strings.ToUpper(infoMap["racks:"]), "ERROR") {
252		return NewAerospikeError(UNSUPPORTED_FEATURE, "You have set the ClientPolicy.RackAware = true, but the server does not support this feature.")
253	}
254
255	ss := strings.Split(infoMap["racks:"], ";")
256	racks := map[string]int{}
257	for _, s := range ss {
258		in := bufio.NewReader(strings.NewReader(s))
259		_, err := in.ReadString('=')
260		if err != nil {
261			return err
262		}
263
264		ns, err := in.ReadString(':')
265		if err != nil {
266			return err
267		}
268
269		for {
270			_, err = in.ReadString('_')
271			if err != nil {
272				return err
273			}
274
275			rackStr, err := in.ReadString('=')
276			if err != nil {
277				return err
278			}
279
280			rack, err := strconv.Atoi(rackStr[:len(rackStr)-1])
281			if err != nil {
282				return err
283			}
284
285			nodesList, err := in.ReadString(':')
286			if err != nil && err != io.EOF {
287				return err
288			}
289
290			nodes := strings.Split(strings.Trim(nodesList, ":"), ",")
291			for i := range nodes {
292				if nodes[i] == nd.name {
293					racks[ns[:len(ns)-1]] = rack
294				}
295			}
296
297			if err == io.EOF {
298				break
299			}
300		}
301	}
302
303	nd.racks.Store(racks)
304
305	return nil
306}
307
308func (nd *Node) verifyNodeName(infoMap map[string]string) error {
309	infoName, exists := infoMap["node"]
310
311	if !exists || len(infoName) == 0 {
312		return NewAerospikeError(INVALID_NODE_ERROR, "Node name is empty")
313	}
314
315	if !(nd.name == infoName) {
316		// Set node to inactive immediately.
317		nd.active.Set(false)
318		return NewAerospikeError(INVALID_NODE_ERROR, "Node name has changed. Old="+nd.name+" New="+infoName)
319	}
320	return nil
321}
322
323func (nd *Node) verifyPeersGeneration(infoMap map[string]string, peers *peers) error {
324	genString := infoMap["peers-generation"]
325	if len(genString) == 0 {
326		return NewAerospikeError(PARSE_ERROR, "peers-generation is empty")
327	}
328
329	gen, err := strconv.Atoi(genString)
330	if err != nil {
331		return NewAerospikeError(PARSE_ERROR, "peers-generation is not a number: "+genString)
332	}
333
334	peers.genChanged.Or(nd.peersGeneration.Get() != gen)
335	return nil
336}
337
338func (nd *Node) verifyPartitionGeneration(infoMap map[string]string) error {
339	genString := infoMap["partition-generation"]
340
341	if len(genString) == 0 {
342		return NewAerospikeError(PARSE_ERROR, "partition-generation is empty")
343	}
344
345	gen, err := strconv.Atoi(genString)
346	if err != nil {
347		return NewAerospikeError(PARSE_ERROR, "partition-generation is not a number:"+genString)
348	}
349
350	if nd.partitionGeneration.Get() != gen {
351		nd.partitionChanged.Set(true)
352	}
353	return nil
354}
355
356func (nd *Node) addFriends(infoMap map[string]string, peers *peers) error {
357	friendString, exists := infoMap[nd.cluster.clientPolicy.servicesString()]
358
359	if !exists || len(friendString) == 0 {
360		nd.peersCount.Set(0)
361		return nil
362	}
363
364	friendNames := strings.Split(friendString, ";")
365	nd.peersCount.Set(len(friendNames))
366
367	for _, friend := range friendNames {
368		friendInfo := strings.Split(friend, ":")
369
370		if len(friendInfo) != 2 {
371			Logger.Error("Node info from asinfo:services is malformed. Expected HOST:PORT, but got `%s`", friend)
372			continue
373		}
374
375		hostName := friendInfo[0]
376		port, _ := strconv.Atoi(friendInfo[1])
377
378		if len(nd.cluster.clientPolicy.IpMap) > 0 {
379			if alternativeHost, ok := nd.cluster.clientPolicy.IpMap[hostName]; ok {
380				hostName = alternativeHost
381			}
382		}
383
384		host := NewHost(hostName, port)
385		node := nd.cluster.findAlias(host)
386
387		if node != nil {
388			node.referenceCount.IncrementAndGet()
389		} else {
390			if !peers.hostExists(*host) {
391				nd.prepareFriend(host, peers)
392			}
393		}
394	}
395
396	return nil
397}
398
399func (nd *Node) prepareFriend(host *Host, peers *peers) bool {
400	nv := &nodeValidator{}
401	if err := nv.validateNode(nd.cluster, host); err != nil {
402		Logger.Warn("Adding node `%s` failed: %s", host, err)
403		return false
404	}
405
406	node := peers.nodeByName(nv.name)
407
408	if node != nil {
409		// Duplicate node name found.  This usually occurs when the server
410		// services list contains both internal and external IP addresses
411		// for the same node.
412		peers.addHost(*host)
413		node.addAlias(host)
414		return true
415	}
416
417	// Check for duplicate nodes in cluster.
418	node = nd.cluster.nodesMap.Get().(map[string]*Node)[nv.name]
419
420	if node != nil {
421		peers.addHost(*host)
422		node.addAlias(host)
423		node.referenceCount.IncrementAndGet()
424		nd.cluster.addAlias(host, node)
425		return true
426	}
427
428	node = nd.cluster.createNode(nv)
429	peers.addHost(*host)
430	peers.addNode(nv.name, node)
431	return true
432}
433
434func (nd *Node) refreshPeers(peers *peers) {
435	// Do not refresh peers when node connection has already failed during this cluster tend iteration.
436	if nd.failures.Get() > 0 || !nd.active.Get() {
437		return
438	}
439
440	peerParser, err := parsePeers(nd.cluster, nd)
441	if err != nil {
442		Logger.Debug("Parsing peers failed: %s", err)
443		nd.refreshFailed(err)
444		return
445	}
446
447	peers.appendPeers(peerParser.peers)
448	nd.peersGeneration.Set(int(peerParser.generation()))
449	nd.peersCount.Set(len(peers.peers()))
450	peers.refreshCount.IncrementAndGet()
451}
452
453func (nd *Node) refreshPartitions(peers *peers, partitions partitionMap) {
454	// Do not refresh peers when node connection has already failed during this cluster tend iteration.
455	// Also, avoid "split cluster" case where this node thinks it's a 1-node cluster.
456	// Unchecked, such a node can dominate the partition map and cause all other
457	// nodes to be dropped.
458	if nd.failures.Get() > 0 || !nd.active.Get() || (nd.peersCount.Get() == 0 && peers.refreshCount.Get() > 1) {
459		return
460	}
461
462	parser, err := newPartitionParser(nd, partitions, _PARTITIONS)
463	if err != nil {
464		nd.refreshFailed(err)
465		return
466	}
467
468	if parser.generation != nd.partitionGeneration.Get() {
469		Logger.Info("Node %s partition generation changed from %d to %d", nd.host.String(), nd.partitionGeneration.Get(), parser.getGeneration())
470		nd.partitionChanged.Set(true)
471		nd.partitionGeneration.Set(parser.getGeneration())
472		atomic.AddInt64(&nd.stats.PartitionMapUpdates, 1)
473	}
474}
475
476func (nd *Node) refreshFailed(e error) {
477	nd.failures.IncrementAndGet()
478	atomic.AddInt64(&nd.stats.TendsFailed, 1)
479
480	// Only log message if cluster is still active.
481	if nd.cluster.IsConnected() {
482		Logger.Warn("Node `%s` refresh failed: `%s`", nd, e)
483	}
484}
485
486// dropIdleConnections picks a connection from the head of the connection pool queue
487// if that connection is idle, it drops it and takes the next one until it picks
488// a fresh connection or exhaust the queue.
489func (nd *Node) dropIdleConnections() {
490	nd.connections.DropIdle()
491}
492
493// GetConnection gets a connection to the node.
494// If no pooled connection is available, a new connection will be created, unless
495// ClientPolicy.MaxQueueSize number of connections are already created.
496// This method will retry to retrieve a connection in case the connection pool
497// is empty, until timeout is reached.
498func (nd *Node) GetConnection(timeout time.Duration) (conn *Connection, err error) {
499	if timeout <= 0 {
500		timeout = _DEFAULT_TIMEOUT
501	}
502	deadline := time.Now().Add(timeout)
503
504	for time.Now().Before(deadline) {
505		conn, err = nd.getConnection(deadline, timeout)
506		if err == nil && conn != nil {
507			return conn, nil
508		}
509
510		if err == ErrServerNotAvailable {
511			return nil, err
512		}
513
514		time.Sleep(5 * time.Millisecond)
515	}
516
517	// in case the block didn't run at all
518	if err == nil {
519		err = ErrConnectionPoolEmpty
520	}
521
522	return nil, err
523}
524
525// getConnection gets a connection to the node.
526// If no pooled connection is available, a new connection will be created.
527func (nd *Node) getConnection(deadline time.Time, timeout time.Duration) (conn *Connection, err error) {
528	return nd.getConnectionWithHint(deadline, timeout, 0)
529}
530
531// newConnection will make a new connection for the node.
532func (nd *Node) newConnection(overrideThreshold bool) (*Connection, error) {
533	if !nd.active.Get() {
534		return nil, ErrServerNotAvailable
535	}
536
537	// if connection count is limited and enough connections are already created, don't create a new one
538	cc := nd.connectionCount.IncrementAndGet()
539	if nd.cluster.clientPolicy.LimitConnectionsToQueueSize && cc > nd.cluster.clientPolicy.ConnectionQueueSize {
540		nd.connectionCount.DecrementAndGet()
541		atomic.AddInt64(&nd.stats.ConnectionsPoolEmpty, 1)
542
543		return nil, ErrTooManyConnectionsForNode
544	}
545
546	// Check for opening connection threshold
547	if !overrideThreshold && nd.cluster.clientPolicy.OpeningConnectionThreshold > 0 {
548		ct := nd.cluster.connectionThreshold.IncrementAndGet()
549		if ct > nd.cluster.clientPolicy.OpeningConnectionThreshold {
550			nd.cluster.connectionThreshold.DecrementAndGet()
551			nd.connectionCount.DecrementAndGet()
552
553			return nil, ErrTooManyOpeningConnections
554		}
555
556		defer nd.cluster.connectionThreshold.DecrementAndGet()
557	}
558
559	atomic.AddInt64(&nd.stats.ConnectionsAttempts, 1)
560	conn, err := NewConnection(&nd.cluster.clientPolicy, nd.host)
561	if err != nil {
562		nd.connectionCount.DecrementAndGet()
563		atomic.AddInt64(&nd.stats.ConnectionsFailed, 1)
564		return nil, err
565	}
566	conn.node = nd
567
568	// need to authenticate
569	if err = conn.login(&nd.cluster.clientPolicy, nd.cluster.Password(), nd.sessionToken()); err != nil {
570		atomic.AddInt64(&nd.stats.ConnectionsFailed, 1)
571
572		// Socket not authenticated. Do not put back into pool.
573		conn.Close()
574		return nil, err
575	}
576
577	atomic.AddInt64(&nd.stats.ConnectionsSuccessful, 1)
578	conn.setIdleTimeout(nd.cluster.clientPolicy.IdleTimeout)
579
580	return conn, nil
581}
582
583// makeConnectionForPool will try to open a connection until deadline.
584// if no deadline is defined, it will only try for _DEFAULT_TIMEOUT.
585func (nd *Node) makeConnectionForPool(hint byte) {
586	conn, err := nd.newConnection(false)
587	if err != nil {
588		Logger.Debug("Error trying to make a connection to the node %s: %s", nd.String(), err.Error())
589		return
590	}
591
592	nd.putConnectionWithHint(conn, hint)
593}
594
595// getConnectionWithHint gets a connection to the node.
596// If no pooled connection is available, a new connection will be created.
597func (nd *Node) getConnectionWithHint(deadline time.Time, timeout time.Duration, hint byte) (conn *Connection, err error) {
598	if !nd.active.Get() {
599		return nil, ErrServerNotAvailable
600	}
601
602	// try to get a valid connection from the connection pool
603	for conn = nd.connections.Poll(hint); conn != nil; conn = nd.connections.Poll(hint) {
604		if conn.IsConnected() {
605			break
606		}
607		conn.Close()
608		conn = nil
609	}
610
611	if conn == nil {
612		go nd.makeConnectionForPool(hint)
613		return nil, ErrConnectionPoolEmpty
614	}
615
616	if err = conn.SetTimeout(deadline, timeout); err != nil {
617		atomic.AddInt64(&nd.stats.ConnectionsFailed, 1)
618
619		// Do not put back into pool.
620		conn.Close()
621		return nil, err
622	}
623
624	conn.refresh()
625
626	return conn, nil
627}
628
629// PutConnection puts back a connection to the pool.
630// If connection pool is full, the connection will be
631// closed and discarded.
632func (nd *Node) putConnectionWithHint(conn *Connection, hint byte) bool {
633	conn.refresh()
634	if !nd.active.Get() || !nd.connections.Offer(conn, hint) {
635		conn.Close()
636		return false
637	}
638	return true
639}
640
641// PutConnection puts back a connection to the pool.
642// If connection pool is full, the connection will be
643// closed and discarded.
644func (nd *Node) PutConnection(conn *Connection) {
645	nd.putConnectionWithHint(conn, 0)
646}
647
648// InvalidateConnection closes and discards a connection from the pool.
649func (nd *Node) InvalidateConnection(conn *Connection) {
650	conn.Close()
651}
652
653// GetHost retrieves host for the node.
654func (nd *Node) GetHost() *Host {
655	return nd.host
656}
657
658// IsActive Checks if the node is active.
659func (nd *Node) IsActive() bool {
660	return nd != nil && nd.active.Get() && nd.partitionGeneration.Get() >= -1
661}
662
663// GetName returns node name.
664func (nd *Node) GetName() string {
665	return nd.name
666}
667
668// GetAliases returns node aliases.
669func (nd *Node) GetAliases() []*Host {
670	return nd.aliases.Load().([]*Host)
671}
672
673// Sets node aliases
674func (nd *Node) setAliases(aliases []*Host) {
675	nd.aliases.Store(aliases)
676}
677
678// AddAlias adds an alias for the node
679func (nd *Node) addAlias(aliasToAdd *Host) {
680	// Aliases are only referenced in the cluster tend goroutine,
681	// so synchronization is not necessary.
682	aliases := nd.GetAliases()
683	if aliases == nil {
684		aliases = []*Host{}
685	}
686
687	aliases = append(aliases, aliasToAdd)
688	nd.setAliases(aliases)
689}
690
691// Close marks node as inactive and closes all of its pooled connections.
692func (nd *Node) Close() {
693	if nd.active.Get() {
694		nd.active.Set(false)
695		atomic.AddInt64(&nd.stats.NodeRemoved, 1)
696	}
697	nd.closeConnections()
698	nd.connections.cleanup()
699}
700
701// String implements stringer interface
702func (nd *Node) String() string {
703	return nd.name + " " + nd.host.String()
704}
705
706func (nd *Node) closeConnections() {
707	for conn := nd.connections.Poll(0); conn != nil; conn = nd.connections.Poll(0) {
708		conn.Close()
709	}
710
711	// close the tend connection
712	nd.tendConnLock.Lock()
713	defer nd.tendConnLock.Unlock()
714	if nd.tendConn != nil {
715		nd.tendConn.Close()
716	}
717}
718
719// Equals compares equality of two nodes based on their names.
720func (nd *Node) Equals(other *Node) bool {
721	return nd != nil && other != nil && (nd == other || nd.name == other.name)
722}
723
724// MigrationInProgress determines if the node is participating in a data migration
725func (nd *Node) MigrationInProgress() (bool, error) {
726	values, err := nd.RequestStats(&nd.cluster.infoPolicy)
727	if err != nil {
728		return false, err
729	}
730
731	// if the migrate_partitions_remaining exists and is not `0`, then migration is in progress
732	if migration, exists := values["migrate_partitions_remaining"]; exists && migration != "0" {
733		return true, nil
734	}
735
736	// migration not in progress
737	return false, nil
738}
739
740// WaitUntillMigrationIsFinished will block until migration operations are finished.
741func (nd *Node) WaitUntillMigrationIsFinished(timeout time.Duration) (err error) {
742	if timeout <= 0 {
743		timeout = _NO_TIMEOUT
744	}
745	done := make(chan error)
746
747	go func() {
748		// this function is guaranteed to return after timeout
749		// no go routines will be leaked
750		for {
751			if res, err := nd.MigrationInProgress(); err != nil || !res {
752				done <- err
753				return
754			}
755		}
756	}()
757
758	dealine := time.After(timeout)
759	select {
760	case <-dealine:
761		return NewAerospikeError(TIMEOUT)
762	case err = <-done:
763		return err
764	}
765}
766
767// initTendConn sets up a connection to be used for info requests.
768// The same connection will be used for tend.
769func (nd *Node) initTendConn(timeout time.Duration) error {
770	if timeout <= 0 {
771		timeout = _DEFAULT_TIMEOUT
772	}
773	deadline := time.Now().Add(timeout)
774
775	if nd.tendConn == nil || !nd.tendConn.IsConnected() {
776		var tendConn *Connection
777		var err error
778		if nd.connectionCount.Get() == 0 {
779			// if there are no connections in the pool, create a new connection synchronously.
780			// this will make sure the initial tend will get a connection without multiple retries.
781			tendConn, err = nd.newConnection(true)
782		} else {
783			tendConn, err = nd.GetConnection(timeout)
784		}
785
786		if err != nil {
787			return err
788		}
789		nd.tendConn = tendConn
790	}
791
792	// Set timeout for tend conn
793	return nd.tendConn.SetTimeout(deadline, timeout)
794}
795
796// requestInfoWithRetry gets info values by name from the specified database server node.
797// It will try at least N times before returning an error.
798func (nd *Node) requestInfoWithRetry(policy *InfoPolicy, n int, name ...string) (res map[string]string, err error) {
799	for i := 0; i < n; i++ {
800		if res, err = nd.requestInfo(policy.Timeout, name...); err == nil {
801			return res, nil
802		}
803
804		Logger.Error("Error occurred while fetching info from the server node %s: %s", nd.host.String(), err.Error())
805		time.Sleep(100 * time.Millisecond)
806	}
807
808	// return the last error
809	return nil, err
810}
811
812// RequestInfo gets info values by name from the specified database server node.
813func (nd *Node) RequestInfo(policy *InfoPolicy, name ...string) (map[string]string, error) {
814	return nd.requestInfo(policy.Timeout, name...)
815}
816
817// RequestInfo gets info values by name from the specified database server node.
818func (nd *Node) requestInfo(timeout time.Duration, name ...string) (map[string]string, error) {
819	nd.tendConnLock.Lock()
820	defer nd.tendConnLock.Unlock()
821
822	if err := nd.initTendConn(timeout); err != nil {
823		return nil, err
824	}
825
826	response, err := RequestInfo(nd.tendConn, name...)
827	if err != nil {
828		nd.tendConn.Close()
829		return nil, err
830	}
831	return response, nil
832}
833
834// requestRawInfo gets info values by name from the specified database server node.
835// It won't parse the results.
836func (nd *Node) requestRawInfo(policy *InfoPolicy, name ...string) (*info, error) {
837	nd.tendConnLock.Lock()
838	defer nd.tendConnLock.Unlock()
839
840	if err := nd.initTendConn(policy.Timeout); err != nil {
841		return nil, err
842	}
843
844	response, err := newInfo(nd.tendConn, name...)
845	if err != nil {
846		nd.tendConn.Close()
847		return nil, err
848	}
849	return response, nil
850}
851
852// RequestStats returns statistics for the specified node as a map
853func (node *Node) RequestStats(policy *InfoPolicy) (map[string]string, error) {
854	infoMap, err := node.RequestInfo(policy, "statistics")
855	if err != nil {
856		return nil, err
857	}
858
859	res := map[string]string{}
860
861	v, exists := infoMap["statistics"]
862	if !exists {
863		return res, nil
864	}
865
866	values := strings.Split(v, ";")
867	for i := range values {
868		kv := strings.Split(values[i], "=")
869		if len(kv) > 1 {
870			res[kv[0]] = kv[1]
871		}
872	}
873
874	return res, nil
875}
876
877// sessionToken returns the session token for the node.
878// It will return nil if the session has expired.
879func (nd *Node) sessionToken() []byte {
880	var deadline time.Time
881	deadlineIfc := nd._sessionExpiration.Load()
882	if deadlineIfc != nil {
883		deadline = deadlineIfc.(time.Time)
884	}
885
886	if deadline.IsZero() || time.Now().After(deadline) {
887		return nil
888	}
889
890	st := nd._sessionToken.Load()
891	if st != nil {
892		return st.([]byte)
893	}
894	return nil
895}
896
897// Rack returns the rack number for the namespace.
898func (nd *Node) Rack(namespace string) (int, error) {
899	racks := nd.racks.Load().(map[string]int)
900	v, exists := racks[namespace]
901
902	if exists {
903		return v, nil
904	}
905
906	return -1, newAerospikeNodeError(nd, RACK_NOT_DEFINED)
907}
908
909// Rack returns the rack number for the namespace.
910func (nd *Node) hasRack(namespace string, rack int) bool {
911	racks := nd.racks.Load().(map[string]int)
912	v, exists := racks[namespace]
913
914	if !exists {
915		return false
916	}
917
918	return v == rack
919}
920
921// WarmUp fills the node's connection pool with connections.
922// This is necessary on startup for high traffic programs.
923// If the count is <= 0, the connection queue will be filled.
924// If the count is more than the size of the pool, the pool will be filled.
925// Note: One connection per node is reserved for tend operations and is not used for transactions.
926func (nd *Node) WarmUp(count int) (int, error) {
927	var g errgroup.Group
928	cnt := NewAtomicInt(0)
929
930	toAlloc := nd.connections.Cap() - nd.connectionCount.Get()
931	if count < toAlloc && count > 0 {
932		toAlloc = count
933	}
934
935	for i := 0; i < toAlloc; i++ {
936		g.Go(func() error {
937			conn, err := nd.newConnection(true)
938			if err != nil {
939				if err == ErrTooManyConnectionsForNode {
940					return nil
941				}
942				return err
943			}
944
945			if nd.putConnectionWithHint(conn, 0) {
946				cnt.IncrementAndGet()
947			} else {
948				conn.Close()
949			}
950
951			return nil
952		})
953	}
954
955	err := g.Wait()
956	return cnt.Get(), err
957}
958
959// fillMinCounts will fill the connection pool to the minimum required
960// by the ClientPolicy.MinConnectionsPerNode
961func (nd *Node) fillMinConns() (int, error) {
962	if nd.cluster.clientPolicy.MinConnectionsPerNode > 0 {
963		toFill := nd.cluster.clientPolicy.MinConnectionsPerNode - nd.connectionCount.Get()
964		if toFill > 0 {
965			return nd.WarmUp(toFill)
966		}
967	}
968	return 0, nil
969}
970