1// Copyright 2013-2020 Aerospike, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package aerospike 16 17import ( 18 "bufio" 19 "io" 20 "strconv" 21 "strings" 22 "sync" 23 "sync/atomic" 24 "time" 25 26 "golang.org/x/sync/errgroup" 27 28 . "github.com/aerospike/aerospike-client-go/internal/atomic" 29 . "github.com/aerospike/aerospike-client-go/logger" 30 . "github.com/aerospike/aerospike-client-go/types" 31) 32 33const ( 34 _PARTITIONS = 4096 35) 36 37// Node represents an Aerospike Database Server Node 38type Node struct { 39 cluster *Cluster 40 name string 41 host *Host 42 aliases atomic.Value //[]*Host 43 stats nodeStats 44 _sessionToken atomic.Value //[]byte 45 _sessionExpiration atomic.Value //time.Time 46 47 racks atomic.Value //map[string]int 48 49 // tendConn reserves a connection for tend so that it won't have to 50 // wait in queue for connections, since that will cause starvation 51 // and the node being dropped under load. 52 tendConn *Connection 53 tendConnLock sync.Mutex // All uses of tend connection should be synchronized 54 55 peersGeneration AtomicInt 56 peersCount AtomicInt 57 58 connections connectionHeap 59 connectionCount AtomicInt 60 health AtomicInt //AtomicInteger 61 62 partitionGeneration AtomicInt 63 referenceCount AtomicInt 64 failures AtomicInt 65 partitionChanged AtomicBool 66 67 active AtomicBool 68 69 supportsFloat, supportsBatchIndex, supportsReplicas, supportsGeo, supportsPeers, supportsLUTNow, supportsTruncateNamespace, supportsClusterStable, supportsBitwiseOps AtomicBool 70} 71 72// NewNode initializes a server node with connection parameters. 73func newNode(cluster *Cluster, nv *nodeValidator) *Node { 74 newNode := &Node{ 75 cluster: cluster, 76 name: nv.name, 77 // address: nv.primaryAddress, 78 host: nv.primaryHost, 79 80 // Assign host to first IP alias because the server identifies nodes 81 // by IP address (not hostname). 82 connections: *newConnectionHeap(cluster.clientPolicy.MinConnectionsPerNode, cluster.clientPolicy.ConnectionQueueSize), 83 connectionCount: *NewAtomicInt(0), 84 peersGeneration: *NewAtomicInt(-1), 85 partitionGeneration: *NewAtomicInt(-2), 86 referenceCount: *NewAtomicInt(0), 87 failures: *NewAtomicInt(0), 88 active: *NewAtomicBool(true), 89 partitionChanged: *NewAtomicBool(false), 90 91 supportsFloat: *NewAtomicBool(nv.supportsFloat), 92 supportsBatchIndex: *NewAtomicBool(nv.supportsBatchIndex), 93 supportsReplicas: *NewAtomicBool(nv.supportsReplicas), 94 supportsGeo: *NewAtomicBool(nv.supportsGeo), 95 supportsPeers: *NewAtomicBool(nv.supportsPeers), 96 supportsLUTNow: *NewAtomicBool(nv.supportsLUTNow), 97 supportsTruncateNamespace: *NewAtomicBool(nv.supportsTruncateNamespace), 98 supportsClusterStable: *NewAtomicBool(nv.supportsClusterStable), 99 supportsBitwiseOps: *NewAtomicBool(nv.supportsBitwiseOps), 100 } 101 102 newNode.aliases.Store(nv.aliases) 103 newNode._sessionToken.Store(nv.sessionToken) 104 newNode.racks.Store(map[string]int{}) 105 106 // this will reset to zero on first aggregation on the cluster, 107 // therefore will only be counted once. 108 atomic.AddInt64(&newNode.stats.NodeAdded, 1) 109 110 return newNode 111} 112 113// Refresh requests current status from server node, and updates node with the result. 114func (nd *Node) Refresh(peers *peers) error { 115 if !nd.active.Get() { 116 return nil 117 } 118 119 atomic.AddInt64(&nd.stats.TendsTotal, 1) 120 121 // Close idleConnections 122 defer nd.dropIdleConnections() 123 124 nd.referenceCount.Set(0) 125 126 var infoMap map[string]string 127 var err error 128 if peers.usePeers.Get() { 129 commands := []string{"node", "peers-generation", "partition-generation"} 130 if nd.cluster.clientPolicy.RackAware { 131 commands = append(commands, "racks:") 132 } 133 134 infoMap, err = nd.RequestInfo(&nd.cluster.infoPolicy, commands...) 135 if err != nil { 136 nd.refreshFailed(err) 137 return err 138 } 139 140 if err := nd.verifyNodeName(infoMap); err != nil { 141 nd.refreshFailed(err) 142 return err 143 } 144 145 if err := nd.verifyPeersGeneration(infoMap, peers); err != nil { 146 nd.refreshFailed(err) 147 return err 148 } 149 150 if err := nd.verifyPartitionGeneration(infoMap); err != nil { 151 nd.refreshFailed(err) 152 return err 153 } 154 } else { 155 commands := []string{"node", "partition-generation", nd.cluster.clientPolicy.servicesString()} 156 if nd.cluster.clientPolicy.RackAware { 157 commands = append(commands, "racks:") 158 } 159 160 infoMap, err = nd.RequestInfo(&nd.cluster.infoPolicy, commands...) 161 if err != nil { 162 nd.refreshFailed(err) 163 return err 164 } 165 166 if err := nd.verifyNodeName(infoMap); err != nil { 167 nd.refreshFailed(err) 168 return err 169 } 170 171 if err = nd.verifyPartitionGeneration(infoMap); err != nil { 172 nd.refreshFailed(err) 173 return err 174 } 175 176 if err = nd.addFriends(infoMap, peers); err != nil { 177 nd.refreshFailed(err) 178 return err 179 } 180 } 181 182 if err := nd.updateRackInfo(infoMap); err != nil { 183 // Update rack info should fail if the feature is not supported on the server 184 if aerr, ok := err.(AerospikeError); ok && aerr.ResultCode() == UNSUPPORTED_FEATURE { 185 nd.refreshFailed(err) 186 return err 187 } 188 // Should not fail in other cases 189 Logger.Warn("Updating node rack info failed with error: %s (racks: `%s`)", err, infoMap["racks:"]) 190 } 191 192 nd.failures.Set(0) 193 peers.refreshCount.IncrementAndGet() 194 nd.referenceCount.IncrementAndGet() 195 atomic.AddInt64(&nd.stats.TendsSuccessful, 1) 196 197 if err := nd.refreshSessionToken(); err != nil { 198 Logger.Error("Error refreshing session token: %s", err.Error()) 199 } 200 201 if _, err := nd.fillMinConns(); err != nil { 202 Logger.Error("Error filling up the connection queue to the minimum required") 203 } 204 205 return nil 206} 207 208// refreshSessionToken refreshes the session token if it has been expired 209func (nd *Node) refreshSessionToken() error { 210 // no session token to refresh 211 if !nd.cluster.clientPolicy.RequiresAuthentication() || nd.cluster.clientPolicy.AuthMode != AuthModeExternal { 212 return nil 213 } 214 215 var deadline time.Time 216 deadlineIfc := nd._sessionExpiration.Load() 217 if deadlineIfc != nil { 218 deadline = deadlineIfc.(time.Time) 219 } 220 221 if deadline.IsZero() || time.Now().Before(deadline) { 222 return nil 223 } 224 225 nd.tendConnLock.Lock() 226 defer nd.tendConnLock.Unlock() 227 228 if err := nd.initTendConn(nd.cluster.clientPolicy.LoginTimeout); err != nil { 229 return err 230 } 231 232 command := newLoginCommand(nd.tendConn.dataBuffer) 233 if err := command.login(&nd.cluster.clientPolicy, nd.tendConn, nd.cluster.Password()); err != nil { 234 // Socket not authenticated. Do not put back into pool. 235 nd.tendConn.Close() 236 return err 237 } 238 239 nd._sessionToken.Store(command.SessionToken) 240 nd._sessionExpiration.Store(command.SessionExpiration) 241 242 return nil 243} 244 245func (nd *Node) updateRackInfo(infoMap map[string]string) error { 246 if !nd.cluster.clientPolicy.RackAware { 247 return nil 248 } 249 250 // Do not raise an error if the server does not support rackaware 251 if strings.HasPrefix(strings.ToUpper(infoMap["racks:"]), "ERROR") { 252 return NewAerospikeError(UNSUPPORTED_FEATURE, "You have set the ClientPolicy.RackAware = true, but the server does not support this feature.") 253 } 254 255 ss := strings.Split(infoMap["racks:"], ";") 256 racks := map[string]int{} 257 for _, s := range ss { 258 in := bufio.NewReader(strings.NewReader(s)) 259 _, err := in.ReadString('=') 260 if err != nil { 261 return err 262 } 263 264 ns, err := in.ReadString(':') 265 if err != nil { 266 return err 267 } 268 269 for { 270 _, err = in.ReadString('_') 271 if err != nil { 272 return err 273 } 274 275 rackStr, err := in.ReadString('=') 276 if err != nil { 277 return err 278 } 279 280 rack, err := strconv.Atoi(rackStr[:len(rackStr)-1]) 281 if err != nil { 282 return err 283 } 284 285 nodesList, err := in.ReadString(':') 286 if err != nil && err != io.EOF { 287 return err 288 } 289 290 nodes := strings.Split(strings.Trim(nodesList, ":"), ",") 291 for i := range nodes { 292 if nodes[i] == nd.name { 293 racks[ns[:len(ns)-1]] = rack 294 } 295 } 296 297 if err == io.EOF { 298 break 299 } 300 } 301 } 302 303 nd.racks.Store(racks) 304 305 return nil 306} 307 308func (nd *Node) verifyNodeName(infoMap map[string]string) error { 309 infoName, exists := infoMap["node"] 310 311 if !exists || len(infoName) == 0 { 312 return NewAerospikeError(INVALID_NODE_ERROR, "Node name is empty") 313 } 314 315 if !(nd.name == infoName) { 316 // Set node to inactive immediately. 317 nd.active.Set(false) 318 return NewAerospikeError(INVALID_NODE_ERROR, "Node name has changed. Old="+nd.name+" New="+infoName) 319 } 320 return nil 321} 322 323func (nd *Node) verifyPeersGeneration(infoMap map[string]string, peers *peers) error { 324 genString := infoMap["peers-generation"] 325 if len(genString) == 0 { 326 return NewAerospikeError(PARSE_ERROR, "peers-generation is empty") 327 } 328 329 gen, err := strconv.Atoi(genString) 330 if err != nil { 331 return NewAerospikeError(PARSE_ERROR, "peers-generation is not a number: "+genString) 332 } 333 334 peers.genChanged.Or(nd.peersGeneration.Get() != gen) 335 return nil 336} 337 338func (nd *Node) verifyPartitionGeneration(infoMap map[string]string) error { 339 genString := infoMap["partition-generation"] 340 341 if len(genString) == 0 { 342 return NewAerospikeError(PARSE_ERROR, "partition-generation is empty") 343 } 344 345 gen, err := strconv.Atoi(genString) 346 if err != nil { 347 return NewAerospikeError(PARSE_ERROR, "partition-generation is not a number:"+genString) 348 } 349 350 if nd.partitionGeneration.Get() != gen { 351 nd.partitionChanged.Set(true) 352 } 353 return nil 354} 355 356func (nd *Node) addFriends(infoMap map[string]string, peers *peers) error { 357 friendString, exists := infoMap[nd.cluster.clientPolicy.servicesString()] 358 359 if !exists || len(friendString) == 0 { 360 nd.peersCount.Set(0) 361 return nil 362 } 363 364 friendNames := strings.Split(friendString, ";") 365 nd.peersCount.Set(len(friendNames)) 366 367 for _, friend := range friendNames { 368 friendInfo := strings.Split(friend, ":") 369 370 if len(friendInfo) != 2 { 371 Logger.Error("Node info from asinfo:services is malformed. Expected HOST:PORT, but got `%s`", friend) 372 continue 373 } 374 375 hostName := friendInfo[0] 376 port, _ := strconv.Atoi(friendInfo[1]) 377 378 if len(nd.cluster.clientPolicy.IpMap) > 0 { 379 if alternativeHost, ok := nd.cluster.clientPolicy.IpMap[hostName]; ok { 380 hostName = alternativeHost 381 } 382 } 383 384 host := NewHost(hostName, port) 385 node := nd.cluster.findAlias(host) 386 387 if node != nil { 388 node.referenceCount.IncrementAndGet() 389 } else { 390 if !peers.hostExists(*host) { 391 nd.prepareFriend(host, peers) 392 } 393 } 394 } 395 396 return nil 397} 398 399func (nd *Node) prepareFriend(host *Host, peers *peers) bool { 400 nv := &nodeValidator{} 401 if err := nv.validateNode(nd.cluster, host); err != nil { 402 Logger.Warn("Adding node `%s` failed: %s", host, err) 403 return false 404 } 405 406 node := peers.nodeByName(nv.name) 407 408 if node != nil { 409 // Duplicate node name found. This usually occurs when the server 410 // services list contains both internal and external IP addresses 411 // for the same node. 412 peers.addHost(*host) 413 node.addAlias(host) 414 return true 415 } 416 417 // Check for duplicate nodes in cluster. 418 node = nd.cluster.nodesMap.Get().(map[string]*Node)[nv.name] 419 420 if node != nil { 421 peers.addHost(*host) 422 node.addAlias(host) 423 node.referenceCount.IncrementAndGet() 424 nd.cluster.addAlias(host, node) 425 return true 426 } 427 428 node = nd.cluster.createNode(nv) 429 peers.addHost(*host) 430 peers.addNode(nv.name, node) 431 return true 432} 433 434func (nd *Node) refreshPeers(peers *peers) { 435 // Do not refresh peers when node connection has already failed during this cluster tend iteration. 436 if nd.failures.Get() > 0 || !nd.active.Get() { 437 return 438 } 439 440 peerParser, err := parsePeers(nd.cluster, nd) 441 if err != nil { 442 Logger.Debug("Parsing peers failed: %s", err) 443 nd.refreshFailed(err) 444 return 445 } 446 447 peers.appendPeers(peerParser.peers) 448 nd.peersGeneration.Set(int(peerParser.generation())) 449 nd.peersCount.Set(len(peers.peers())) 450 peers.refreshCount.IncrementAndGet() 451} 452 453func (nd *Node) refreshPartitions(peers *peers, partitions partitionMap) { 454 // Do not refresh peers when node connection has already failed during this cluster tend iteration. 455 // Also, avoid "split cluster" case where this node thinks it's a 1-node cluster. 456 // Unchecked, such a node can dominate the partition map and cause all other 457 // nodes to be dropped. 458 if nd.failures.Get() > 0 || !nd.active.Get() || (nd.peersCount.Get() == 0 && peers.refreshCount.Get() > 1) { 459 return 460 } 461 462 parser, err := newPartitionParser(nd, partitions, _PARTITIONS) 463 if err != nil { 464 nd.refreshFailed(err) 465 return 466 } 467 468 if parser.generation != nd.partitionGeneration.Get() { 469 Logger.Info("Node %s partition generation changed from %d to %d", nd.host.String(), nd.partitionGeneration.Get(), parser.getGeneration()) 470 nd.partitionChanged.Set(true) 471 nd.partitionGeneration.Set(parser.getGeneration()) 472 atomic.AddInt64(&nd.stats.PartitionMapUpdates, 1) 473 } 474} 475 476func (nd *Node) refreshFailed(e error) { 477 nd.failures.IncrementAndGet() 478 atomic.AddInt64(&nd.stats.TendsFailed, 1) 479 480 // Only log message if cluster is still active. 481 if nd.cluster.IsConnected() { 482 Logger.Warn("Node `%s` refresh failed: `%s`", nd, e) 483 } 484} 485 486// dropIdleConnections picks a connection from the head of the connection pool queue 487// if that connection is idle, it drops it and takes the next one until it picks 488// a fresh connection or exhaust the queue. 489func (nd *Node) dropIdleConnections() { 490 nd.connections.DropIdle() 491} 492 493// GetConnection gets a connection to the node. 494// If no pooled connection is available, a new connection will be created, unless 495// ClientPolicy.MaxQueueSize number of connections are already created. 496// This method will retry to retrieve a connection in case the connection pool 497// is empty, until timeout is reached. 498func (nd *Node) GetConnection(timeout time.Duration) (conn *Connection, err error) { 499 if timeout <= 0 { 500 timeout = _DEFAULT_TIMEOUT 501 } 502 deadline := time.Now().Add(timeout) 503 504 for time.Now().Before(deadline) { 505 conn, err = nd.getConnection(deadline, timeout) 506 if err == nil && conn != nil { 507 return conn, nil 508 } 509 510 if err == ErrServerNotAvailable { 511 return nil, err 512 } 513 514 time.Sleep(5 * time.Millisecond) 515 } 516 517 // in case the block didn't run at all 518 if err == nil { 519 err = ErrConnectionPoolEmpty 520 } 521 522 return nil, err 523} 524 525// getConnection gets a connection to the node. 526// If no pooled connection is available, a new connection will be created. 527func (nd *Node) getConnection(deadline time.Time, timeout time.Duration) (conn *Connection, err error) { 528 return nd.getConnectionWithHint(deadline, timeout, 0) 529} 530 531// newConnection will make a new connection for the node. 532func (nd *Node) newConnection(overrideThreshold bool) (*Connection, error) { 533 if !nd.active.Get() { 534 return nil, ErrServerNotAvailable 535 } 536 537 // if connection count is limited and enough connections are already created, don't create a new one 538 cc := nd.connectionCount.IncrementAndGet() 539 if nd.cluster.clientPolicy.LimitConnectionsToQueueSize && cc > nd.cluster.clientPolicy.ConnectionQueueSize { 540 nd.connectionCount.DecrementAndGet() 541 atomic.AddInt64(&nd.stats.ConnectionsPoolEmpty, 1) 542 543 return nil, ErrTooManyConnectionsForNode 544 } 545 546 // Check for opening connection threshold 547 if !overrideThreshold && nd.cluster.clientPolicy.OpeningConnectionThreshold > 0 { 548 ct := nd.cluster.connectionThreshold.IncrementAndGet() 549 if ct > nd.cluster.clientPolicy.OpeningConnectionThreshold { 550 nd.cluster.connectionThreshold.DecrementAndGet() 551 nd.connectionCount.DecrementAndGet() 552 553 return nil, ErrTooManyOpeningConnections 554 } 555 556 defer nd.cluster.connectionThreshold.DecrementAndGet() 557 } 558 559 atomic.AddInt64(&nd.stats.ConnectionsAttempts, 1) 560 conn, err := NewConnection(&nd.cluster.clientPolicy, nd.host) 561 if err != nil { 562 nd.connectionCount.DecrementAndGet() 563 atomic.AddInt64(&nd.stats.ConnectionsFailed, 1) 564 return nil, err 565 } 566 conn.node = nd 567 568 // need to authenticate 569 if err = conn.login(&nd.cluster.clientPolicy, nd.cluster.Password(), nd.sessionToken()); err != nil { 570 atomic.AddInt64(&nd.stats.ConnectionsFailed, 1) 571 572 // Socket not authenticated. Do not put back into pool. 573 conn.Close() 574 return nil, err 575 } 576 577 atomic.AddInt64(&nd.stats.ConnectionsSuccessful, 1) 578 conn.setIdleTimeout(nd.cluster.clientPolicy.IdleTimeout) 579 580 return conn, nil 581} 582 583// makeConnectionForPool will try to open a connection until deadline. 584// if no deadline is defined, it will only try for _DEFAULT_TIMEOUT. 585func (nd *Node) makeConnectionForPool(hint byte) { 586 conn, err := nd.newConnection(false) 587 if err != nil { 588 Logger.Debug("Error trying to make a connection to the node %s: %s", nd.String(), err.Error()) 589 return 590 } 591 592 nd.putConnectionWithHint(conn, hint) 593} 594 595// getConnectionWithHint gets a connection to the node. 596// If no pooled connection is available, a new connection will be created. 597func (nd *Node) getConnectionWithHint(deadline time.Time, timeout time.Duration, hint byte) (conn *Connection, err error) { 598 if !nd.active.Get() { 599 return nil, ErrServerNotAvailable 600 } 601 602 // try to get a valid connection from the connection pool 603 for conn = nd.connections.Poll(hint); conn != nil; conn = nd.connections.Poll(hint) { 604 if conn.IsConnected() { 605 break 606 } 607 conn.Close() 608 conn = nil 609 } 610 611 if conn == nil { 612 go nd.makeConnectionForPool(hint) 613 return nil, ErrConnectionPoolEmpty 614 } 615 616 if err = conn.SetTimeout(deadline, timeout); err != nil { 617 atomic.AddInt64(&nd.stats.ConnectionsFailed, 1) 618 619 // Do not put back into pool. 620 conn.Close() 621 return nil, err 622 } 623 624 conn.refresh() 625 626 return conn, nil 627} 628 629// PutConnection puts back a connection to the pool. 630// If connection pool is full, the connection will be 631// closed and discarded. 632func (nd *Node) putConnectionWithHint(conn *Connection, hint byte) bool { 633 conn.refresh() 634 if !nd.active.Get() || !nd.connections.Offer(conn, hint) { 635 conn.Close() 636 return false 637 } 638 return true 639} 640 641// PutConnection puts back a connection to the pool. 642// If connection pool is full, the connection will be 643// closed and discarded. 644func (nd *Node) PutConnection(conn *Connection) { 645 nd.putConnectionWithHint(conn, 0) 646} 647 648// InvalidateConnection closes and discards a connection from the pool. 649func (nd *Node) InvalidateConnection(conn *Connection) { 650 conn.Close() 651} 652 653// GetHost retrieves host for the node. 654func (nd *Node) GetHost() *Host { 655 return nd.host 656} 657 658// IsActive Checks if the node is active. 659func (nd *Node) IsActive() bool { 660 return nd != nil && nd.active.Get() && nd.partitionGeneration.Get() >= -1 661} 662 663// GetName returns node name. 664func (nd *Node) GetName() string { 665 return nd.name 666} 667 668// GetAliases returns node aliases. 669func (nd *Node) GetAliases() []*Host { 670 return nd.aliases.Load().([]*Host) 671} 672 673// Sets node aliases 674func (nd *Node) setAliases(aliases []*Host) { 675 nd.aliases.Store(aliases) 676} 677 678// AddAlias adds an alias for the node 679func (nd *Node) addAlias(aliasToAdd *Host) { 680 // Aliases are only referenced in the cluster tend goroutine, 681 // so synchronization is not necessary. 682 aliases := nd.GetAliases() 683 if aliases == nil { 684 aliases = []*Host{} 685 } 686 687 aliases = append(aliases, aliasToAdd) 688 nd.setAliases(aliases) 689} 690 691// Close marks node as inactive and closes all of its pooled connections. 692func (nd *Node) Close() { 693 if nd.active.Get() { 694 nd.active.Set(false) 695 atomic.AddInt64(&nd.stats.NodeRemoved, 1) 696 } 697 nd.closeConnections() 698 nd.connections.cleanup() 699} 700 701// String implements stringer interface 702func (nd *Node) String() string { 703 return nd.name + " " + nd.host.String() 704} 705 706func (nd *Node) closeConnections() { 707 for conn := nd.connections.Poll(0); conn != nil; conn = nd.connections.Poll(0) { 708 conn.Close() 709 } 710 711 // close the tend connection 712 nd.tendConnLock.Lock() 713 defer nd.tendConnLock.Unlock() 714 if nd.tendConn != nil { 715 nd.tendConn.Close() 716 } 717} 718 719// Equals compares equality of two nodes based on their names. 720func (nd *Node) Equals(other *Node) bool { 721 return nd != nil && other != nil && (nd == other || nd.name == other.name) 722} 723 724// MigrationInProgress determines if the node is participating in a data migration 725func (nd *Node) MigrationInProgress() (bool, error) { 726 values, err := nd.RequestStats(&nd.cluster.infoPolicy) 727 if err != nil { 728 return false, err 729 } 730 731 // if the migrate_partitions_remaining exists and is not `0`, then migration is in progress 732 if migration, exists := values["migrate_partitions_remaining"]; exists && migration != "0" { 733 return true, nil 734 } 735 736 // migration not in progress 737 return false, nil 738} 739 740// WaitUntillMigrationIsFinished will block until migration operations are finished. 741func (nd *Node) WaitUntillMigrationIsFinished(timeout time.Duration) (err error) { 742 if timeout <= 0 { 743 timeout = _NO_TIMEOUT 744 } 745 done := make(chan error) 746 747 go func() { 748 // this function is guaranteed to return after timeout 749 // no go routines will be leaked 750 for { 751 if res, err := nd.MigrationInProgress(); err != nil || !res { 752 done <- err 753 return 754 } 755 } 756 }() 757 758 dealine := time.After(timeout) 759 select { 760 case <-dealine: 761 return NewAerospikeError(TIMEOUT) 762 case err = <-done: 763 return err 764 } 765} 766 767// initTendConn sets up a connection to be used for info requests. 768// The same connection will be used for tend. 769func (nd *Node) initTendConn(timeout time.Duration) error { 770 if timeout <= 0 { 771 timeout = _DEFAULT_TIMEOUT 772 } 773 deadline := time.Now().Add(timeout) 774 775 if nd.tendConn == nil || !nd.tendConn.IsConnected() { 776 var tendConn *Connection 777 var err error 778 if nd.connectionCount.Get() == 0 { 779 // if there are no connections in the pool, create a new connection synchronously. 780 // this will make sure the initial tend will get a connection without multiple retries. 781 tendConn, err = nd.newConnection(true) 782 } else { 783 tendConn, err = nd.GetConnection(timeout) 784 } 785 786 if err != nil { 787 return err 788 } 789 nd.tendConn = tendConn 790 } 791 792 // Set timeout for tend conn 793 return nd.tendConn.SetTimeout(deadline, timeout) 794} 795 796// requestInfoWithRetry gets info values by name from the specified database server node. 797// It will try at least N times before returning an error. 798func (nd *Node) requestInfoWithRetry(policy *InfoPolicy, n int, name ...string) (res map[string]string, err error) { 799 for i := 0; i < n; i++ { 800 if res, err = nd.requestInfo(policy.Timeout, name...); err == nil { 801 return res, nil 802 } 803 804 Logger.Error("Error occurred while fetching info from the server node %s: %s", nd.host.String(), err.Error()) 805 time.Sleep(100 * time.Millisecond) 806 } 807 808 // return the last error 809 return nil, err 810} 811 812// RequestInfo gets info values by name from the specified database server node. 813func (nd *Node) RequestInfo(policy *InfoPolicy, name ...string) (map[string]string, error) { 814 return nd.requestInfo(policy.Timeout, name...) 815} 816 817// RequestInfo gets info values by name from the specified database server node. 818func (nd *Node) requestInfo(timeout time.Duration, name ...string) (map[string]string, error) { 819 nd.tendConnLock.Lock() 820 defer nd.tendConnLock.Unlock() 821 822 if err := nd.initTendConn(timeout); err != nil { 823 return nil, err 824 } 825 826 response, err := RequestInfo(nd.tendConn, name...) 827 if err != nil { 828 nd.tendConn.Close() 829 return nil, err 830 } 831 return response, nil 832} 833 834// requestRawInfo gets info values by name from the specified database server node. 835// It won't parse the results. 836func (nd *Node) requestRawInfo(policy *InfoPolicy, name ...string) (*info, error) { 837 nd.tendConnLock.Lock() 838 defer nd.tendConnLock.Unlock() 839 840 if err := nd.initTendConn(policy.Timeout); err != nil { 841 return nil, err 842 } 843 844 response, err := newInfo(nd.tendConn, name...) 845 if err != nil { 846 nd.tendConn.Close() 847 return nil, err 848 } 849 return response, nil 850} 851 852// RequestStats returns statistics for the specified node as a map 853func (node *Node) RequestStats(policy *InfoPolicy) (map[string]string, error) { 854 infoMap, err := node.RequestInfo(policy, "statistics") 855 if err != nil { 856 return nil, err 857 } 858 859 res := map[string]string{} 860 861 v, exists := infoMap["statistics"] 862 if !exists { 863 return res, nil 864 } 865 866 values := strings.Split(v, ";") 867 for i := range values { 868 kv := strings.Split(values[i], "=") 869 if len(kv) > 1 { 870 res[kv[0]] = kv[1] 871 } 872 } 873 874 return res, nil 875} 876 877// sessionToken returns the session token for the node. 878// It will return nil if the session has expired. 879func (nd *Node) sessionToken() []byte { 880 var deadline time.Time 881 deadlineIfc := nd._sessionExpiration.Load() 882 if deadlineIfc != nil { 883 deadline = deadlineIfc.(time.Time) 884 } 885 886 if deadline.IsZero() || time.Now().After(deadline) { 887 return nil 888 } 889 890 st := nd._sessionToken.Load() 891 if st != nil { 892 return st.([]byte) 893 } 894 return nil 895} 896 897// Rack returns the rack number for the namespace. 898func (nd *Node) Rack(namespace string) (int, error) { 899 racks := nd.racks.Load().(map[string]int) 900 v, exists := racks[namespace] 901 902 if exists { 903 return v, nil 904 } 905 906 return -1, newAerospikeNodeError(nd, RACK_NOT_DEFINED) 907} 908 909// Rack returns the rack number for the namespace. 910func (nd *Node) hasRack(namespace string, rack int) bool { 911 racks := nd.racks.Load().(map[string]int) 912 v, exists := racks[namespace] 913 914 if !exists { 915 return false 916 } 917 918 return v == rack 919} 920 921// WarmUp fills the node's connection pool with connections. 922// This is necessary on startup for high traffic programs. 923// If the count is <= 0, the connection queue will be filled. 924// If the count is more than the size of the pool, the pool will be filled. 925// Note: One connection per node is reserved for tend operations and is not used for transactions. 926func (nd *Node) WarmUp(count int) (int, error) { 927 var g errgroup.Group 928 cnt := NewAtomicInt(0) 929 930 toAlloc := nd.connections.Cap() - nd.connectionCount.Get() 931 if count < toAlloc && count > 0 { 932 toAlloc = count 933 } 934 935 for i := 0; i < toAlloc; i++ { 936 g.Go(func() error { 937 conn, err := nd.newConnection(true) 938 if err != nil { 939 if err == ErrTooManyConnectionsForNode { 940 return nil 941 } 942 return err 943 } 944 945 if nd.putConnectionWithHint(conn, 0) { 946 cnt.IncrementAndGet() 947 } else { 948 conn.Close() 949 } 950 951 return nil 952 }) 953 } 954 955 err := g.Wait() 956 return cnt.Get(), err 957} 958 959// fillMinCounts will fill the connection pool to the minimum required 960// by the ClientPolicy.MinConnectionsPerNode 961func (nd *Node) fillMinConns() (int, error) { 962 if nd.cluster.clientPolicy.MinConnectionsPerNode > 0 { 963 toFill := nd.cluster.clientPolicy.MinConnectionsPerNode - nd.connectionCount.Get() 964 if toFill > 0 { 965 return nd.WarmUp(toFill) 966 } 967 } 968 return 0, nil 969} 970