1// Copyright 2013-2020 Aerospike, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package aerospike 16 17import ( 18 "bytes" 19 "context" 20 "encoding/base64" 21 "encoding/json" 22 "errors" 23 "fmt" 24 "io/ioutil" 25 "runtime" 26 "strconv" 27 "strings" 28 "sync" 29 "time" 30 31 "golang.org/x/sync/semaphore" 32 33 . "github.com/aerospike/aerospike-client-go/internal/atomic" 34 . "github.com/aerospike/aerospike-client-go/logger" 35 . "github.com/aerospike/aerospike-client-go/types" 36 xornd "github.com/aerospike/aerospike-client-go/types/rand" 37) 38 39// Client encapsulates an Aerospike cluster. 40// All database operations are available against this object. 41type Client struct { 42 cluster *Cluster 43 44 // DefaultPolicy is used for all read commands without a specific policy. 45 DefaultPolicy *BasePolicy 46 // DefaultBatchPolicy is used for all batch commands without a specific policy. 47 DefaultBatchPolicy *BatchPolicy 48 // DefaultWritePolicy is used for all write commands without a specific policy. 49 DefaultWritePolicy *WritePolicy 50 // DefaultScanPolicy is used for all scan commands without a specific policy. 51 DefaultScanPolicy *ScanPolicy 52 // DefaultQueryPolicy is used for all query commands without a specific policy. 53 DefaultQueryPolicy *QueryPolicy 54 // DefaultAdminPolicy is used for all security commands without a specific policy. 55 DefaultAdminPolicy *AdminPolicy 56} 57 58func clientFinalizer(f *Client) { 59 f.Close() 60} 61 62//------------------------------------------------------- 63// Constructors 64//------------------------------------------------------- 65 66// NewClient generates a new Client instance. 67func NewClient(hostname string, port int) (*Client, error) { 68 return NewClientWithPolicyAndHost(NewClientPolicy(), NewHost(hostname, port)) 69} 70 71// NewClientWithPolicy generates a new Client using the specified ClientPolicy. 72// If the policy is nil, the default relevant policy will be used. 73func NewClientWithPolicy(policy *ClientPolicy, hostname string, port int) (*Client, error) { 74 return NewClientWithPolicyAndHost(policy, NewHost(hostname, port)) 75} 76 77// NewClientWithPolicyAndHost generates a new Client the specified ClientPolicy and 78// sets up the cluster using the provided hosts. 79// If the policy is nil, the default relevant policy will be used. 80func NewClientWithPolicyAndHost(policy *ClientPolicy, hosts ...*Host) (*Client, error) { 81 if policy == nil { 82 policy = NewClientPolicy() 83 } 84 85 cluster, err := NewCluster(policy, hosts) 86 if err != nil && policy.FailIfNotConnected { 87 if aerr, ok := err.(AerospikeError); ok { 88 Logger.Debug("Failed to connect to host(s): %v; error: %s", hosts, err) 89 return nil, aerr 90 } 91 return nil, fmt.Errorf("Failed to connect to host(s): %v; error: %s", hosts, err) 92 } 93 94 client := &Client{ 95 cluster: cluster, 96 DefaultPolicy: NewPolicy(), 97 DefaultBatchPolicy: NewBatchPolicy(), 98 DefaultWritePolicy: NewWritePolicy(0, 0), 99 DefaultScanPolicy: NewScanPolicy(), 100 DefaultQueryPolicy: NewQueryPolicy(), 101 DefaultAdminPolicy: NewAdminPolicy(), 102 } 103 104 runtime.SetFinalizer(client, clientFinalizer) 105 return client, err 106 107} 108 109//------------------------------------------------------- 110// Cluster Connection Management 111//------------------------------------------------------- 112 113// Close closes all client connections to database server nodes. 114func (clnt *Client) Close() { 115 clnt.cluster.Close() 116} 117 118// IsConnected determines if the client is ready to talk to the database server cluster. 119func (clnt *Client) IsConnected() bool { 120 return clnt.cluster.IsConnected() 121} 122 123// GetNodes returns an array of active server nodes in the cluster. 124func (clnt *Client) GetNodes() []*Node { 125 return clnt.cluster.GetNodes() 126} 127 128// GetNodeNames returns a list of active server node names in the cluster. 129func (clnt *Client) GetNodeNames() []string { 130 nodes := clnt.cluster.GetNodes() 131 names := make([]string, 0, len(nodes)) 132 133 for _, node := range nodes { 134 names = append(names, node.GetName()) 135 } 136 return names 137} 138 139//------------------------------------------------------- 140// Write Record Operations 141//------------------------------------------------------- 142 143// Put writes record bin(s) to the server. 144// The policy specifies the transaction timeout, record expiration and how the transaction is 145// handled when the record already exists. 146// If the policy is nil, the default relevant policy will be used. 147func (clnt *Client) Put(policy *WritePolicy, key *Key, binMap BinMap) error { 148 policy = clnt.getUsableWritePolicy(policy) 149 command, err := newWriteCommand(clnt.cluster, policy, key, nil, binMap, _WRITE) 150 if err != nil { 151 return err 152 } 153 154 return command.Execute() 155} 156 157// PutBins writes record bin(s) to the server. 158// The policy specifies the transaction timeout, record expiration and how the transaction is 159// handled when the record already exists. 160// This method avoids using the BinMap allocation and iteration and is lighter on GC. 161// If the policy is nil, the default relevant policy will be used. 162func (clnt *Client) PutBins(policy *WritePolicy, key *Key, bins ...*Bin) error { 163 policy = clnt.getUsableWritePolicy(policy) 164 command, err := newWriteCommand(clnt.cluster, policy, key, bins, nil, _WRITE) 165 if err != nil { 166 return err 167 } 168 169 return command.Execute() 170} 171 172//------------------------------------------------------- 173// Operations string 174//------------------------------------------------------- 175 176// Append appends bin value's string to existing record bin values. 177// The policy specifies the transaction timeout, record expiration and how the transaction is 178// handled when the record already exists. 179// This call only works for string and []byte values. 180// If the policy is nil, the default relevant policy will be used. 181func (clnt *Client) Append(policy *WritePolicy, key *Key, binMap BinMap) error { 182 policy = clnt.getUsableWritePolicy(policy) 183 command, err := newWriteCommand(clnt.cluster, policy, key, nil, binMap, _APPEND) 184 if err != nil { 185 return err 186 } 187 188 return command.Execute() 189} 190 191// AppendBins works the same as Append, but avoids BinMap allocation and iteration. 192func (clnt *Client) AppendBins(policy *WritePolicy, key *Key, bins ...*Bin) error { 193 policy = clnt.getUsableWritePolicy(policy) 194 command, err := newWriteCommand(clnt.cluster, policy, key, bins, nil, _APPEND) 195 if err != nil { 196 return err 197 } 198 199 return command.Execute() 200} 201 202// Prepend prepends bin value's string to existing record bin values. 203// The policy specifies the transaction timeout, record expiration and how the transaction is 204// handled when the record already exists. 205// This call works only for string and []byte values. 206// If the policy is nil, the default relevant policy will be used. 207func (clnt *Client) Prepend(policy *WritePolicy, key *Key, binMap BinMap) error { 208 policy = clnt.getUsableWritePolicy(policy) 209 command, err := newWriteCommand(clnt.cluster, policy, key, nil, binMap, _PREPEND) 210 if err != nil { 211 return err 212 } 213 214 return command.Execute() 215} 216 217// PrependBins works the same as Prepend, but avoids BinMap allocation and iteration. 218func (clnt *Client) PrependBins(policy *WritePolicy, key *Key, bins ...*Bin) error { 219 policy = clnt.getUsableWritePolicy(policy) 220 command, err := newWriteCommand(clnt.cluster, policy, key, bins, nil, _PREPEND) 221 if err != nil { 222 return err 223 } 224 225 return command.Execute() 226} 227 228//------------------------------------------------------- 229// Arithmetic Operations 230//------------------------------------------------------- 231 232// Add adds integer bin values to existing record bin values. 233// The policy specifies the transaction timeout, record expiration and how the transaction is 234// handled when the record already exists. 235// This call only works for integer values. 236// If the policy is nil, the default relevant policy will be used. 237func (clnt *Client) Add(policy *WritePolicy, key *Key, binMap BinMap) error { 238 policy = clnt.getUsableWritePolicy(policy) 239 command, err := newWriteCommand(clnt.cluster, policy, key, nil, binMap, _ADD) 240 if err != nil { 241 return err 242 } 243 244 return command.Execute() 245} 246 247// AddBins works the same as Add, but avoids BinMap allocation and iteration. 248func (clnt *Client) AddBins(policy *WritePolicy, key *Key, bins ...*Bin) error { 249 policy = clnt.getUsableWritePolicy(policy) 250 command, err := newWriteCommand(clnt.cluster, policy, key, bins, nil, _ADD) 251 if err != nil { 252 return err 253 } 254 255 return command.Execute() 256} 257 258//------------------------------------------------------- 259// Delete Operations 260//------------------------------------------------------- 261 262// Delete deletes a record for specified key. 263// The policy specifies the transaction timeout. 264// If the policy is nil, the default relevant policy will be used. 265func (clnt *Client) Delete(policy *WritePolicy, key *Key) (bool, error) { 266 policy = clnt.getUsableWritePolicy(policy) 267 command, err := newDeleteCommand(clnt.cluster, policy, key) 268 if err != nil { 269 return false, err 270 } 271 272 err = command.Execute() 273 return command.Existed(), err 274} 275 276//------------------------------------------------------- 277// Touch Operations 278//------------------------------------------------------- 279 280// Touch updates a record's metadata. 281// If the record exists, the record's TTL will be reset to the 282// policy's expiration. 283// If the record doesn't exist, it will return an error. 284func (clnt *Client) Touch(policy *WritePolicy, key *Key) error { 285 policy = clnt.getUsableWritePolicy(policy) 286 command, err := newTouchCommand(clnt.cluster, policy, key) 287 if err != nil { 288 return err 289 } 290 291 return command.Execute() 292} 293 294//------------------------------------------------------- 295// Existence-Check Operations 296//------------------------------------------------------- 297 298// Exists determine if a record key exists. 299// The policy can be used to specify timeouts. 300// If the policy is nil, the default relevant policy will be used. 301func (clnt *Client) Exists(policy *BasePolicy, key *Key) (bool, error) { 302 policy = clnt.getUsablePolicy(policy) 303 command, err := newExistsCommand(clnt.cluster, policy, key) 304 if err != nil { 305 return false, err 306 } 307 308 err = command.Execute() 309 return command.Exists(), err 310} 311 312// BatchExists determines if multiple record keys exist in one batch request. 313// The returned boolean array is in positional order with the original key array order. 314// The policy can be used to specify timeouts. 315// If the policy is nil, the default relevant policy will be used. 316func (clnt *Client) BatchExists(policy *BatchPolicy, keys []*Key) ([]bool, error) { 317 policy = clnt.getUsableBatchPolicy(policy) 318 319 // same array can be used without synchronization; 320 // when a key exists, the corresponding index will be marked true 321 existsArray := make([]bool, len(keys)) 322 323 batchNodes, err := newBatchNodeList(clnt.cluster, policy, keys) 324 if err != nil { 325 return nil, err 326 } 327 328 // pass nil to make sure it will be cloned and prepared 329 cmd := newBatchCommandExists(nil, nil, policy, keys, existsArray) 330 err, filteredOut := clnt.batchExecute(policy, batchNodes, cmd) 331 if err != nil { 332 return nil, err 333 } 334 335 if filteredOut > 0 { 336 err = ErrFilteredOut 337 } 338 339 return existsArray, err 340} 341 342//------------------------------------------------------- 343// Read Record Operations 344//------------------------------------------------------- 345 346// Get reads a record header and bins for specified key. 347// The policy can be used to specify timeouts. 348// If the policy is nil, the default relevant policy will be used. 349func (clnt *Client) Get(policy *BasePolicy, key *Key, binNames ...string) (*Record, error) { 350 policy = clnt.getUsablePolicy(policy) 351 352 command, err := newReadCommand(clnt.cluster, policy, key, binNames, nil) 353 if err != nil { 354 return nil, err 355 } 356 357 if err := command.Execute(); err != nil { 358 return nil, err 359 } 360 return command.GetRecord(), nil 361} 362 363// GetHeader reads a record generation and expiration only for specified key. 364// Bins are not read. 365// The policy can be used to specify timeouts. 366// If the policy is nil, the default relevant policy will be used. 367func (clnt *Client) GetHeader(policy *BasePolicy, key *Key) (*Record, error) { 368 policy = clnt.getUsablePolicy(policy) 369 370 command, err := newReadHeaderCommand(clnt.cluster, policy, key) 371 if err != nil { 372 return nil, err 373 } 374 375 if err := command.Execute(); err != nil { 376 return nil, err 377 } 378 return command.GetRecord(), nil 379} 380 381//------------------------------------------------------- 382// Batch Read Operations 383//------------------------------------------------------- 384 385// BatchGet reads multiple record headers and bins for specified keys in one batch request. 386// The returned records are in positional order with the original key array order. 387// If a key is not found, the positional record will be nil. 388// The policy can be used to specify timeouts. 389// If the policy is nil, the default relevant policy will be used. 390func (clnt *Client) BatchGet(policy *BatchPolicy, keys []*Key, binNames ...string) ([]*Record, error) { 391 policy = clnt.getUsableBatchPolicy(policy) 392 393 // same array can be used without synchronization; 394 // when a key exists, the corresponding index will be set to record 395 records := make([]*Record, len(keys)) 396 397 batchNodes, err := newBatchNodeList(clnt.cluster, policy, keys) 398 if err != nil { 399 return nil, err 400 } 401 402 cmd := newBatchCommandGet(nil, nil, policy, keys, binNames, records, _INFO1_READ) 403 err, filteredOut := clnt.batchExecute(policy, batchNodes, cmd) 404 if err != nil && !policy.AllowPartialResults { 405 return nil, err 406 } 407 408 if filteredOut > 0 { 409 if err == nil { 410 err = ErrFilteredOut 411 } else { 412 mergeErrors([]error{err, ErrFilteredOut}) 413 } 414 } 415 416 return records, err 417} 418 419// BatchGetComplex reads multiple records for specified batch keys in one batch call. 420// This method allows different namespaces/bins to be requested for each key in the batch. 421// The returned records are located in the same list. 422// If the BatchRead key field is not found, the corresponding record field will be null. 423// The policy can be used to specify timeouts and maximum concurrent threads. 424// This method requires Aerospike Server version >= 3.6.0. 425func (clnt *Client) BatchGetComplex(policy *BatchPolicy, records []*BatchRead) error { 426 policy = clnt.getUsableBatchPolicy(policy) 427 428 cmd := newBatchIndexCommandGet(nil, policy, records) 429 430 batchNodes, err := newBatchIndexNodeList(clnt.cluster, policy, records) 431 if err != nil { 432 return err 433 } 434 435 err, filteredOut := clnt.batchExecute(policy, batchNodes, cmd) 436 if err != nil && !policy.AllowPartialResults { 437 return err 438 } 439 440 if filteredOut > 0 { 441 if err == nil { 442 err = ErrFilteredOut 443 } else { 444 mergeErrors([]error{err, ErrFilteredOut}) 445 } 446 } 447 448 return err 449} 450 451// BatchGetHeader reads multiple record header data for specified keys in one batch request. 452// The returned records are in positional order with the original key array order. 453// If a key is not found, the positional record will be nil. 454// The policy can be used to specify timeouts. 455// If the policy is nil, the default relevant policy will be used. 456func (clnt *Client) BatchGetHeader(policy *BatchPolicy, keys []*Key) ([]*Record, error) { 457 policy = clnt.getUsableBatchPolicy(policy) 458 459 // same array can be used without synchronization; 460 // when a key exists, the corresponding index will be set to record 461 records := make([]*Record, len(keys)) 462 463 batchNodes, err := newBatchNodeList(clnt.cluster, policy, keys) 464 if err != nil { 465 return nil, err 466 } 467 468 cmd := newBatchCommandGet(nil, nil, policy, keys, nil, records, _INFO1_READ|_INFO1_NOBINDATA) 469 err, filteredOut := clnt.batchExecute(policy, batchNodes, cmd) 470 if err != nil && !policy.AllowPartialResults { 471 return nil, err 472 } 473 474 if filteredOut > 0 { 475 if err == nil { 476 err = ErrFilteredOut 477 } else { 478 mergeErrors([]error{err, ErrFilteredOut}) 479 } 480 } 481 482 return records, err 483} 484 485//------------------------------------------------------- 486// Generic Database Operations 487//------------------------------------------------------- 488 489// Operate performs multiple read/write operations on a single key in one batch request. 490// An example would be to add an integer value to an existing record and then 491// read the result, all in one database call. 492// 493// If the policy is nil, the default relevant policy will be used. 494func (clnt *Client) Operate(policy *WritePolicy, key *Key, operations ...*Operation) (*Record, error) { 495 policy = clnt.getUsableWritePolicy(policy) 496 command, err := newOperateCommand(clnt.cluster, policy, key, operations) 497 if err != nil { 498 return nil, err 499 } 500 501 if err := command.Execute(); err != nil { 502 return nil, err 503 } 504 return command.GetRecord(), nil 505} 506 507//------------------------------------------------------- 508// Scan Operations 509//------------------------------------------------------- 510 511// ScanAll reads all records in specified namespace and set from all nodes. 512// If the policy's concurrentNodes is specified, each server node will be read in 513// parallel. Otherwise, server nodes are read sequentially. 514// If the policy is nil, the default relevant policy will be used. 515func (clnt *Client) ScanAll(apolicy *ScanPolicy, namespace string, setName string, binNames ...string) (*Recordset, error) { 516 policy := *clnt.getUsableScanPolicy(apolicy) 517 518 nodes := clnt.cluster.GetNodes() 519 if len(nodes) == 0 { 520 return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.") 521 } 522 523 clusterKey := int64(0) 524 if policy.FailOnClusterChange { 525 var err error 526 clusterKey, err = queryValidateBegin(nodes[0], namespace) 527 if err != nil { 528 return nil, err 529 } 530 } 531 532 first := NewAtomicBool(true) 533 534 taskID := uint64(xornd.Int64()) 535 536 // result recordset 537 res := newRecordset(policy.RecordQueueSize, len(nodes), taskID) 538 539 // the whole call should be wrapped in a goroutine 540 if policy.ConcurrentNodes { 541 for _, node := range nodes { 542 go func(node *Node, first bool) { 543 clnt.scanNode(&policy, node, res, namespace, setName, taskID, clusterKey, first, binNames...) 544 }(node, first.CompareAndToggle(true)) 545 } 546 } else { 547 // scan nodes one by one 548 go func() { 549 for _, node := range nodes { 550 clnt.scanNode(&policy, node, res, namespace, setName, taskID, clusterKey, first.CompareAndToggle(true), binNames...) 551 } 552 }() 553 } 554 555 return res, nil 556} 557 558// ScanNode reads all records in specified namespace and set for one node only. 559// If the policy is nil, the default relevant policy will be used. 560func (clnt *Client) ScanNode(apolicy *ScanPolicy, node *Node, namespace string, setName string, binNames ...string) (*Recordset, error) { 561 policy := *clnt.getUsableScanPolicy(apolicy) 562 563 clusterKey := int64(0) 564 if policy.FailOnClusterChange { 565 var err error 566 clusterKey, err = queryValidateBegin(node, namespace) 567 if err != nil { 568 return nil, err 569 } 570 } 571 572 // results channel must be async for performance 573 taskID := uint64(xornd.Int64()) 574 res := newRecordset(policy.RecordQueueSize, 1, taskID) 575 576 go clnt.scanNode(&policy, node, res, namespace, setName, taskID, clusterKey, true, binNames...) 577 return res, nil 578} 579 580// ScanNode reads all records in specified namespace and set for one node only. 581// If the policy is nil, the default relevant policy will be used. 582func (clnt *Client) scanNode(policy *ScanPolicy, node *Node, recordset *Recordset, namespace string, setName string, taskID uint64, clusterKey int64, first bool, binNames ...string) error { 583 command := newScanCommand(node, policy, namespace, setName, binNames, recordset, taskID, clusterKey, first) 584 return command.Execute() 585} 586 587//--------------------------------------------------------------- 588// User defined functions (Supported by Aerospike 3+ servers only) 589//--------------------------------------------------------------- 590 591// RegisterUDFFromFile reads a file from file system and registers 592// the containing a package user defined functions with the server. 593// This asynchronous server call will return before command is complete. 594// The user can optionally wait for command completion by using the returned 595// RegisterTask instance. 596// 597// This method is only supported by Aerospike 3+ servers. 598// If the policy is nil, the default relevant policy will be used. 599func (clnt *Client) RegisterUDFFromFile(policy *WritePolicy, clientPath string, serverPath string, language Language) (*RegisterTask, error) { 600 policy = clnt.getUsableWritePolicy(policy) 601 udfBody, err := ioutil.ReadFile(clientPath) 602 if err != nil { 603 return nil, err 604 } 605 606 return clnt.RegisterUDF(policy, udfBody, serverPath, language) 607} 608 609// RegisterUDF registers a package containing user defined functions with server. 610// This asynchronous server call will return before command is complete. 611// The user can optionally wait for command completion by using the returned 612// RegisterTask instance. 613// 614// This method is only supported by Aerospike 3+ servers. 615// If the policy is nil, the default relevant policy will be used. 616func (clnt *Client) RegisterUDF(policy *WritePolicy, udfBody []byte, serverPath string, language Language) (*RegisterTask, error) { 617 policy = clnt.getUsableWritePolicy(policy) 618 content := base64.StdEncoding.EncodeToString(udfBody) 619 620 var strCmd bytes.Buffer 621 // errors are to remove errcheck warnings 622 // they will always be nil as stated in golang docs 623 _, err := strCmd.WriteString("udf-put:filename=") 624 _, err = strCmd.WriteString(serverPath) 625 _, err = strCmd.WriteString(";content=") 626 _, err = strCmd.WriteString(content) 627 _, err = strCmd.WriteString(";content-len=") 628 _, err = strCmd.WriteString(strconv.Itoa(len(content))) 629 _, err = strCmd.WriteString(";udf-type=") 630 _, err = strCmd.WriteString(string(language)) 631 _, err = strCmd.WriteString(";") 632 633 // Send UDF to one node. That node will distribute the UDF to other nodes. 634 responseMap, err := clnt.sendInfoCommand(policy.TotalTimeout, strCmd.String()) 635 if err != nil { 636 return nil, err 637 } 638 639 response := responseMap[strCmd.String()] 640 res := make(map[string]string) 641 vals := strings.Split(response, ";") 642 for _, pair := range vals { 643 t := strings.SplitN(pair, "=", 2) 644 if len(t) == 2 { 645 res[t[0]] = t[1] 646 } else if len(t) == 1 { 647 res[t[0]] = "" 648 } 649 } 650 651 if _, exists := res["error"]; exists { 652 msg, _ := base64.StdEncoding.DecodeString(res["message"]) 653 return nil, NewAerospikeError(COMMAND_REJECTED, fmt.Sprintf("Registration failed: %s\nFile: %s\nLine: %s\nMessage: %s", 654 res["error"], res["file"], res["line"], msg)) 655 } 656 return NewRegisterTask(clnt.cluster, serverPath), nil 657} 658 659// RemoveUDF removes a package containing user defined functions in the server. 660// This asynchronous server call will return before command is complete. 661// The user can optionally wait for command completion by using the returned 662// RemoveTask instance. 663// 664// This method is only supported by Aerospike 3+ servers. 665// If the policy is nil, the default relevant policy will be used. 666func (clnt *Client) RemoveUDF(policy *WritePolicy, udfName string) (*RemoveTask, error) { 667 policy = clnt.getUsableWritePolicy(policy) 668 var strCmd bytes.Buffer 669 // errors are to remove errcheck warnings 670 // they will always be nil as stated in golang docs 671 _, err := strCmd.WriteString("udf-remove:filename=") 672 _, err = strCmd.WriteString(udfName) 673 _, err = strCmd.WriteString(";") 674 675 // Send command to one node. That node will distribute it to other nodes. 676 responseMap, err := clnt.sendInfoCommand(policy.TotalTimeout, strCmd.String()) 677 if err != nil { 678 return nil, err 679 } 680 681 response := responseMap[strCmd.String()] 682 if response == "ok" { 683 return NewRemoveTask(clnt.cluster, udfName), nil 684 } 685 return nil, NewAerospikeError(SERVER_ERROR, response) 686} 687 688// ListUDF lists all packages containing user defined functions in the server. 689// This method is only supported by Aerospike 3+ servers. 690// If the policy is nil, the default relevant policy will be used. 691func (clnt *Client) ListUDF(policy *BasePolicy) ([]*UDF, error) { 692 policy = clnt.getUsablePolicy(policy) 693 694 var strCmd bytes.Buffer 695 // errors are to remove errcheck warnings 696 // they will always be nil as stated in golang docs 697 _, err := strCmd.WriteString("udf-list") 698 699 // Send command to one node. That node will distribute it to other nodes. 700 responseMap, err := clnt.sendInfoCommand(policy.TotalTimeout, strCmd.String()) 701 if err != nil { 702 return nil, err 703 } 704 705 response := responseMap[strCmd.String()] 706 vals := strings.Split(response, ";") 707 res := make([]*UDF, 0, len(vals)) 708 709 for _, udfInfo := range vals { 710 if strings.Trim(udfInfo, " ") == "" { 711 continue 712 } 713 udfParts := strings.Split(udfInfo, ",") 714 715 udf := &UDF{} 716 for _, values := range udfParts { 717 valueParts := strings.Split(values, "=") 718 if len(valueParts) == 2 { 719 switch valueParts[0] { 720 case "filename": 721 udf.Filename = valueParts[1] 722 case "hash": 723 udf.Hash = valueParts[1] 724 case "type": 725 udf.Language = Language(valueParts[1]) 726 } 727 } 728 } 729 res = append(res, udf) 730 } 731 732 return res, nil 733} 734 735// Execute executes a user defined function on server and return results. 736// The function operates on a single record. 737// The package name is used to locate the udf file location: 738// 739// udf file = <server udf dir>/<package name>.lua 740// 741// This method is only supported by Aerospike 3+ servers. 742// If the policy is nil, the default relevant policy will be used. 743func (clnt *Client) Execute(policy *WritePolicy, key *Key, packageName string, functionName string, args ...Value) (interface{}, error) { 744 policy = clnt.getUsableWritePolicy(policy) 745 command, err := newExecuteCommand(clnt.cluster, policy, key, packageName, functionName, NewValueArray(args)) 746 if err != nil { 747 return nil, err 748 } 749 750 if err := command.Execute(); err != nil { 751 return nil, err 752 } 753 754 record := command.GetRecord() 755 756 if record == nil || len(record.Bins) == 0 { 757 return nil, nil 758 } 759 760 for k, v := range record.Bins { 761 if strings.Contains(k, "SUCCESS") { 762 return v, nil 763 } else if strings.Contains(k, "FAILURE") { 764 return nil, fmt.Errorf("%v", v) 765 } 766 } 767 768 return nil, ErrUDFBadResponse 769} 770 771//---------------------------------------------------------- 772// Query/Execute (Supported by Aerospike 3+ servers only) 773//---------------------------------------------------------- 774 775// QueryExecute applies operations on records that match the statement filter. 776// Records are not returned to the client. 777// This asynchronous server call will return before the command is complete. 778// The user can optionally wait for command completion by using the returned 779// ExecuteTask instance. 780// 781// This method is only supported by Aerospike 3+ servers. 782// If the policy is nil, the default relevant policy will be used. 783func (clnt *Client) QueryExecute(policy *QueryPolicy, 784 writePolicy *WritePolicy, 785 statement *Statement, 786 ops ...*Operation, 787) (*ExecuteTask, error) { 788 789 if len(ops) == 0 { 790 return nil, ErrNoOperationsSpecified 791 } 792 793 if len(statement.BinNames) > 0 { 794 return nil, ErrNoBinNamesAlloedInQueryExecute 795 } 796 797 policy = clnt.getUsableQueryPolicy(policy) 798 writePolicy = clnt.getUsableWritePolicy(writePolicy) 799 800 nodes := clnt.cluster.GetNodes() 801 if len(nodes) == 0 { 802 return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "ExecuteOperations failed because cluster is empty.") 803 } 804 805 statement.prepare(false) 806 807 errs := []error{} 808 for i := range nodes { 809 command := newServerCommand(nodes[i], policy, writePolicy, statement, ops) 810 if err := command.Execute(); err != nil { 811 errs = append(errs, err) 812 } 813 } 814 815 return NewExecuteTask(clnt.cluster, statement), mergeErrors(errs) 816} 817 818// ExecuteUDF applies user defined function on records that match the statement filter. 819// Records are not returned to the client. 820// This asynchronous server call will return before command is complete. 821// The user can optionally wait for command completion by using the returned 822// ExecuteTask instance. 823// 824// This method is only supported by Aerospike 3+ servers. 825// If the policy is nil, the default relevant policy will be used. 826func (clnt *Client) ExecuteUDF(policy *QueryPolicy, 827 statement *Statement, 828 packageName string, 829 functionName string, 830 functionArgs ...Value, 831) (*ExecuteTask, error) { 832 policy = clnt.getUsableQueryPolicy(policy) 833 834 nodes := clnt.cluster.GetNodes() 835 if len(nodes) == 0 { 836 return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "ExecuteUDF failed because cluster is empty.") 837 } 838 839 statement.SetAggregateFunction(packageName, functionName, functionArgs, false) 840 841 errs := []error{} 842 for i := range nodes { 843 command := newServerCommand(nodes[i], policy, nil, statement, nil) 844 if err := command.Execute(); err != nil { 845 errs = append(errs, err) 846 } 847 } 848 849 return NewExecuteTask(clnt.cluster, statement), mergeErrors(errs) 850} 851 852// ExecuteUDFNode applies user defined function on records that match the statement filter on the specified node. 853// Records are not returned to the client. 854// This asynchronous server call will return before command is complete. 855// The user can optionally wait for command completion by using the returned 856// ExecuteTask instance. 857// 858// This method is only supported by Aerospike 3+ servers. 859// If the policy is nil, the default relevant policy will be used. 860func (clnt *Client) ExecuteUDFNode(policy *QueryPolicy, 861 node *Node, 862 statement *Statement, 863 packageName string, 864 functionName string, 865 functionArgs ...Value, 866) (*ExecuteTask, error) { 867 policy = clnt.getUsableQueryPolicy(policy) 868 869 if node == nil { 870 return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "ExecuteUDFNode failed because node is nil.") 871 } 872 873 statement.SetAggregateFunction(packageName, functionName, functionArgs, false) 874 875 command := newServerCommand(node, policy, nil, statement, nil) 876 err := command.Execute() 877 878 return NewExecuteTask(clnt.cluster, statement), err 879} 880 881//-------------------------------------------------------- 882// Query functions (Supported by Aerospike 3+ servers only) 883//-------------------------------------------------------- 884 885// Query executes a query and returns a Recordset. 886// The query executor puts records on the channel from separate goroutines. 887// The caller can concurrently pop records off the channel through the 888// Recordset.Records channel. 889// 890// This method is only supported by Aerospike 3+ servers. 891// If the policy is nil, the default relevant policy will be used. 892func (clnt *Client) Query(policy *QueryPolicy, statement *Statement) (*Recordset, error) { 893 policy = clnt.getUsableQueryPolicy(policy) 894 895 nodes := clnt.cluster.GetNodes() 896 if len(nodes) == 0 { 897 return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "Query failed because cluster is empty.") 898 } 899 900 clusterKey := int64(0) 901 if policy.FailOnClusterChange { 902 var err error 903 clusterKey, err = queryValidateBegin(nodes[0], statement.Namespace) 904 if err != nil { 905 return nil, err 906 } 907 } 908 909 first := NewAtomicBool(true) 910 911 // results channel must be async for performance 912 recSet := newRecordset(policy.RecordQueueSize, len(nodes), statement.TaskId) 913 914 // results channel must be async for performance 915 for _, node := range nodes { 916 // copy policies to avoid race conditions 917 newPolicy := *policy 918 command := newQueryRecordCommand(node, &newPolicy, statement, recSet, clusterKey, first.CompareAndToggle(true)) 919 go func() { 920 command.Execute() 921 }() 922 } 923 924 return recSet, nil 925} 926 927// QueryNode executes a query on a specific node and returns a recordset. 928// The caller can concurrently pop records off the channel through the 929// record channel. 930// 931// This method is only supported by Aerospike 3+ servers. 932// If the policy is nil, the default relevant policy will be used. 933func (clnt *Client) QueryNode(policy *QueryPolicy, node *Node, statement *Statement) (*Recordset, error) { 934 policy = clnt.getUsableQueryPolicy(policy) 935 936 // results channel must be async for performance 937 recSet := newRecordset(policy.RecordQueueSize, 1, statement.TaskId) 938 939 clusterKey := int64(0) 940 if policy.FailOnClusterChange { 941 var err error 942 clusterKey, err = queryValidateBegin(node, statement.Namespace) 943 if err != nil { 944 return nil, err 945 } 946 } 947 948 // copy policies to avoid race conditions 949 newPolicy := *policy 950 command := newQueryRecordCommand(node, &newPolicy, statement, recSet, clusterKey, true) 951 go func() { 952 command.Execute() 953 }() 954 955 return recSet, nil 956} 957 958// CreateIndex creates a secondary index. 959// This asynchronous server call will return before the command is complete. 960// The user can optionally wait for command completion by using the returned 961// IndexTask instance. 962// This method is only supported by Aerospike 3+ servers. 963// If the policy is nil, the default relevant policy will be used. 964func (clnt *Client) CreateIndex( 965 policy *WritePolicy, 966 namespace string, 967 setName string, 968 indexName string, 969 binName string, 970 indexType IndexType, 971) (*IndexTask, error) { 972 policy = clnt.getUsableWritePolicy(policy) 973 return clnt.CreateComplexIndex(policy, namespace, setName, indexName, binName, indexType, ICT_DEFAULT) 974} 975 976// CreateComplexIndex creates a secondary index, with the ability to put indexes 977// on bin containing complex data types, e.g: Maps and Lists. 978// This asynchronous server call will return before the command is complete. 979// The user can optionally wait for command completion by using the returned 980// IndexTask instance. 981// This method is only supported by Aerospike 3+ servers. 982// If the policy is nil, the default relevant policy will be used. 983func (clnt *Client) CreateComplexIndex( 984 policy *WritePolicy, 985 namespace string, 986 setName string, 987 indexName string, 988 binName string, 989 indexType IndexType, 990 indexCollectionType IndexCollectionType, 991) (*IndexTask, error) { 992 policy = clnt.getUsableWritePolicy(policy) 993 994 var strCmd bytes.Buffer 995 _, err := strCmd.WriteString("sindex-create:ns=") 996 _, err = strCmd.WriteString(namespace) 997 998 if len(setName) > 0 { 999 _, err = strCmd.WriteString(";set=") 1000 _, err = strCmd.WriteString(setName) 1001 } 1002 1003 _, err = strCmd.WriteString(";indexname=") 1004 _, err = strCmd.WriteString(indexName) 1005 _, err = strCmd.WriteString(";numbins=1") 1006 1007 if indexCollectionType != ICT_DEFAULT { 1008 _, err = strCmd.WriteString(";indextype=") 1009 _, err = strCmd.WriteString(ictToString(indexCollectionType)) 1010 } 1011 1012 _, err = strCmd.WriteString(";indexdata=") 1013 _, err = strCmd.WriteString(binName) 1014 _, err = strCmd.WriteString(",") 1015 _, err = strCmd.WriteString(string(indexType)) 1016 _, err = strCmd.WriteString(";priority=normal") 1017 1018 // Send index command to one node. That node will distribute the command to other nodes. 1019 responseMap, err := clnt.sendInfoCommand(policy.TotalTimeout, strCmd.String()) 1020 if err != nil { 1021 return nil, err 1022 } 1023 1024 response := responseMap[strCmd.String()] 1025 if strings.EqualFold(response, "OK") { 1026 // Return task that could optionally be polled for completion. 1027 return NewIndexTask(clnt.cluster, namespace, indexName), nil 1028 } 1029 1030 if strings.HasPrefix(response, "FAIL:200") { 1031 // Index has already been created. Do not need to poll for completion. 1032 return nil, NewAerospikeError(INDEX_FOUND) 1033 } 1034 1035 return nil, NewAerospikeError(INDEX_GENERIC, "Create index failed: "+response) 1036} 1037 1038// DropIndex deletes a secondary index. It will block until index is dropped on all nodes. 1039// This method is only supported by Aerospike 3+ servers. 1040// If the policy is nil, the default relevant policy will be used. 1041func (clnt *Client) DropIndex( 1042 policy *WritePolicy, 1043 namespace string, 1044 setName string, 1045 indexName string, 1046) error { 1047 policy = clnt.getUsableWritePolicy(policy) 1048 var strCmd bytes.Buffer 1049 _, err := strCmd.WriteString("sindex-delete:ns=") 1050 _, err = strCmd.WriteString(namespace) 1051 1052 if len(setName) > 0 { 1053 _, err = strCmd.WriteString(";set=") 1054 _, err = strCmd.WriteString(setName) 1055 } 1056 _, err = strCmd.WriteString(";indexname=") 1057 _, err = strCmd.WriteString(indexName) 1058 1059 // Send index command to one node. That node will distribute the command to other nodes. 1060 responseMap, err := clnt.sendInfoCommand(policy.TotalTimeout, strCmd.String()) 1061 if err != nil { 1062 return err 1063 } 1064 1065 response := responseMap[strCmd.String()] 1066 1067 if strings.EqualFold(response, "OK") { 1068 // Return task that could optionally be polled for completion. 1069 task := NewDropIndexTask(clnt.cluster, namespace, indexName) 1070 return <-task.OnComplete() 1071 } 1072 1073 if strings.HasPrefix(response, "FAIL:201") { 1074 // Index did not previously exist. Return without error. 1075 return nil 1076 } 1077 1078 return NewAerospikeError(INDEX_GENERIC, "Drop index failed: "+response) 1079} 1080 1081// Truncate removes records in specified namespace/set efficiently. This method is many orders of magnitude 1082// faster than deleting records one at a time. Works with Aerospike Server versions >= 3.12. 1083// This asynchronous server call may return before the truncation is complete. The user can still 1084// write new records after the server call returns because new records will have last update times 1085// greater than the truncate cutoff (set at the time of truncate call). 1086// For more information, See https://www.aerospike.com/docs/reference/info#truncate 1087func (clnt *Client) Truncate(policy *WritePolicy, namespace, set string, beforeLastUpdate *time.Time) error { 1088 policy = clnt.getUsableWritePolicy(policy) 1089 1090 node, err := clnt.cluster.GetRandomNode() 1091 if err != nil { 1092 return err 1093 } 1094 1095 node.tendConnLock.Lock() 1096 defer node.tendConnLock.Unlock() 1097 1098 if err := node.initTendConn(policy.TotalTimeout); err != nil { 1099 return err 1100 } 1101 1102 var strCmd bytes.Buffer 1103 if len(set) > 0 { 1104 _, err = strCmd.WriteString("truncate:namespace=") 1105 _, err = strCmd.WriteString(namespace) 1106 _, err = strCmd.WriteString(";set=") 1107 _, err = strCmd.WriteString(set) 1108 } else { 1109 // Servers >= 4.5.1.0 support truncate-namespace. 1110 if node.supportsTruncateNamespace.Get() { 1111 _, err = strCmd.WriteString("truncate-namespace:namespace=") 1112 _, err = strCmd.WriteString(namespace) 1113 } else { 1114 _, err = strCmd.WriteString("truncate:namespace=") 1115 _, err = strCmd.WriteString(namespace) 1116 } 1117 } 1118 if beforeLastUpdate != nil { 1119 _, err = strCmd.WriteString(";lut=") 1120 _, err = strCmd.WriteString(strconv.FormatInt(beforeLastUpdate.UnixNano(), 10)) 1121 } else { 1122 // Servers >= 4.3.1.4 and <= 4.5.0.1 require lut argument. 1123 if node.supportsLUTNow.Get() { 1124 _, err = strCmd.WriteString(";lut=now") 1125 } 1126 } 1127 1128 responseMap, err := RequestInfo(node.tendConn, strCmd.String()) 1129 if err != nil { 1130 node.tendConn.Close() 1131 return err 1132 } 1133 1134 response := responseMap[strCmd.String()] 1135 if strings.EqualFold(response, "OK") { 1136 return nil 1137 } 1138 1139 return NewAerospikeError(SERVER_ERROR, "Truncate failed: "+response) 1140} 1141 1142//------------------------------------------------------- 1143// User administration 1144//------------------------------------------------------- 1145 1146// CreateUser creates a new user with password and roles. Clear-text password will be hashed using bcrypt 1147// before sending to server. 1148func (clnt *Client) CreateUser(policy *AdminPolicy, user string, password string, roles []string) error { 1149 policy = clnt.getUsableAdminPolicy(policy) 1150 1151 hash, err := hashPassword(password) 1152 if err != nil { 1153 return err 1154 } 1155 command := newAdminCommand(nil) 1156 return command.createUser(clnt.cluster, policy, user, hash, roles) 1157} 1158 1159// DropUser removes a user from the cluster. 1160func (clnt *Client) DropUser(policy *AdminPolicy, user string) error { 1161 policy = clnt.getUsableAdminPolicy(policy) 1162 1163 command := newAdminCommand(nil) 1164 return command.dropUser(clnt.cluster, policy, user) 1165} 1166 1167// ChangePassword changes a user's password. Clear-text password will be hashed using bcrypt before sending to server. 1168func (clnt *Client) ChangePassword(policy *AdminPolicy, user string, password string) error { 1169 policy = clnt.getUsableAdminPolicy(policy) 1170 1171 if clnt.cluster.user == "" { 1172 return NewAerospikeError(INVALID_USER) 1173 } 1174 1175 hash, err := hashPassword(password) 1176 if err != nil { 1177 return err 1178 } 1179 command := newAdminCommand(nil) 1180 1181 if user == clnt.cluster.user { 1182 // Change own password. 1183 if err := command.changePassword(clnt.cluster, policy, user, hash); err != nil { 1184 return err 1185 } 1186 } else { 1187 // Change other user's password by user admin. 1188 if err := command.setPassword(clnt.cluster, policy, user, hash); err != nil { 1189 return err 1190 } 1191 } 1192 1193 clnt.cluster.changePassword(user, password, hash) 1194 1195 return nil 1196} 1197 1198// GrantRoles adds roles to user's list of roles. 1199func (clnt *Client) GrantRoles(policy *AdminPolicy, user string, roles []string) error { 1200 policy = clnt.getUsableAdminPolicy(policy) 1201 1202 command := newAdminCommand(nil) 1203 return command.grantRoles(clnt.cluster, policy, user, roles) 1204} 1205 1206// RevokeRoles removes roles from user's list of roles. 1207func (clnt *Client) RevokeRoles(policy *AdminPolicy, user string, roles []string) error { 1208 policy = clnt.getUsableAdminPolicy(policy) 1209 1210 command := newAdminCommand(nil) 1211 return command.revokeRoles(clnt.cluster, policy, user, roles) 1212} 1213 1214// QueryUser retrieves roles for a given user. 1215func (clnt *Client) QueryUser(policy *AdminPolicy, user string) (*UserRoles, error) { 1216 policy = clnt.getUsableAdminPolicy(policy) 1217 1218 command := newAdminCommand(nil) 1219 return command.queryUser(clnt.cluster, policy, user) 1220} 1221 1222// QueryUsers retrieves all users and their roles. 1223func (clnt *Client) QueryUsers(policy *AdminPolicy) ([]*UserRoles, error) { 1224 policy = clnt.getUsableAdminPolicy(policy) 1225 1226 command := newAdminCommand(nil) 1227 return command.queryUsers(clnt.cluster, policy) 1228} 1229 1230// QueryRole retrieves privileges for a given role. 1231func (clnt *Client) QueryRole(policy *AdminPolicy, role string) (*Role, error) { 1232 policy = clnt.getUsableAdminPolicy(policy) 1233 1234 command := newAdminCommand(nil) 1235 return command.queryRole(clnt.cluster, policy, role) 1236} 1237 1238// QueryRoles retrieves all roles and their privileges. 1239func (clnt *Client) QueryRoles(policy *AdminPolicy) ([]*Role, error) { 1240 policy = clnt.getUsableAdminPolicy(policy) 1241 1242 command := newAdminCommand(nil) 1243 return command.queryRoles(clnt.cluster, policy) 1244} 1245 1246// CreateRole creates a user-defined role. 1247func (clnt *Client) CreateRole(policy *AdminPolicy, roleName string, privileges []Privilege, whitelist []string) error { 1248 policy = clnt.getUsableAdminPolicy(policy) 1249 1250 command := newAdminCommand(nil) 1251 return command.createRole(clnt.cluster, policy, roleName, privileges, whitelist) 1252} 1253 1254// DropRole removes a user-defined role. 1255func (clnt *Client) DropRole(policy *AdminPolicy, roleName string) error { 1256 policy = clnt.getUsableAdminPolicy(policy) 1257 1258 command := newAdminCommand(nil) 1259 return command.dropRole(clnt.cluster, policy, roleName) 1260} 1261 1262// GrantPrivileges grant privileges to a user-defined role. 1263func (clnt *Client) GrantPrivileges(policy *AdminPolicy, roleName string, privileges []Privilege) error { 1264 policy = clnt.getUsableAdminPolicy(policy) 1265 1266 command := newAdminCommand(nil) 1267 return command.grantPrivileges(clnt.cluster, policy, roleName, privileges) 1268} 1269 1270// RevokePrivileges revokes privileges from a user-defined role. 1271func (clnt *Client) RevokePrivileges(policy *AdminPolicy, roleName string, privileges []Privilege) error { 1272 policy = clnt.getUsableAdminPolicy(policy) 1273 1274 command := newAdminCommand(nil) 1275 return command.revokePrivileges(clnt.cluster, policy, roleName, privileges) 1276} 1277 1278// SetWhitelist sets IP address whitelist for a role. If whitelist is nil or empty, it removes existing whitelist from role. 1279func (clnt *Client) SetWhitelist(policy *AdminPolicy, roleName string, whitelist []string) error { 1280 policy = clnt.getUsableAdminPolicy(policy) 1281 1282 command := newAdminCommand(nil) 1283 return command.setWhitelist(clnt.cluster, policy, roleName, whitelist) 1284} 1285 1286// Cluster exposes the cluster object to the user 1287func (clnt *Client) Cluster() *Cluster { 1288 return clnt.cluster 1289} 1290 1291// String implements the Stringer interface for client 1292func (clnt *Client) String() string { 1293 if clnt.cluster != nil { 1294 return clnt.cluster.String() 1295 } 1296 return "" 1297} 1298 1299// Stats returns internal statistics regarding the inner state of the client and the cluster. 1300func (clnt *Client) Stats() (map[string]interface{}, error) { 1301 resStats := clnt.cluster.statsCopy() 1302 1303 clusterStats := nodeStats{} 1304 for _, stats := range resStats { 1305 clusterStats.aggregate(&stats) 1306 } 1307 1308 resStats["cluster-aggregated-stats"] = clusterStats 1309 1310 b, err := json.Marshal(resStats) 1311 if err != nil { 1312 return nil, err 1313 } 1314 1315 res := map[string]interface{}{} 1316 err = json.Unmarshal(b, &res) 1317 if err != nil { 1318 return nil, err 1319 } 1320 1321 res["open-connections"] = clusterStats.ConnectionsOpen 1322 1323 return res, nil 1324} 1325 1326// WarmUp fills the connection pool with connections for all nodes. 1327// This is necessary on startup for high traffic programs. 1328// If the count is <= 0, the connection queue will be filled. 1329// If the count is more than the size of the pool, the pool will be filled. 1330// Note: One connection per node is reserved for tend operations and is not used for transactions. 1331func (clnt *Client) WarmUp(count int) (int, error) { 1332 return clnt.cluster.WarmUp(count) 1333} 1334 1335//------------------------------------------------------- 1336// Internal Methods 1337//------------------------------------------------------- 1338 1339func (clnt *Client) sendInfoCommand(timeout time.Duration, command string) (map[string]string, error) { 1340 node, err := clnt.cluster.GetRandomNode() 1341 if err != nil { 1342 return nil, err 1343 } 1344 1345 node.tendConnLock.Lock() 1346 defer node.tendConnLock.Unlock() 1347 1348 if err := node.initTendConn(timeout); err != nil { 1349 return nil, err 1350 } 1351 1352 results, err := RequestInfo(node.tendConn, command) 1353 if err != nil { 1354 node.tendConn.Close() 1355 return nil, err 1356 } 1357 1358 return results, nil 1359} 1360 1361// batchExecute Uses sync.WaitGroup to run commands using multiple goroutines, 1362// and waits for their return 1363func (clnt *Client) batchExecute(policy *BatchPolicy, batchNodes []*batchNode, cmd batcher) (error, int) { 1364 var wg sync.WaitGroup 1365 filteredOut := 0 1366 1367 // Use a goroutine per namespace per node 1368 errs := []error{} 1369 errm := new(sync.Mutex) 1370 1371 wg.Add(len(batchNodes)) 1372 if policy.ConcurrentNodes <= 0 { 1373 for _, batchNode := range batchNodes { 1374 newCmd := cmd.cloneBatchCommand(batchNode) 1375 go func(cmd command) { 1376 defer wg.Done() 1377 err := cmd.Execute() 1378 errm.Lock() 1379 if err != nil { 1380 errs = append(errs, err) 1381 } 1382 filteredOut += cmd.(batcher).filteredOut() 1383 errm.Unlock() 1384 }(newCmd) 1385 } 1386 } else { 1387 sem := semaphore.NewWeighted(int64(policy.ConcurrentNodes)) 1388 ctx := context.Background() 1389 1390 for _, batchNode := range batchNodes { 1391 if err := sem.Acquire(ctx, 1); err != nil { 1392 errm.Lock() 1393 if err != nil { 1394 errs = append(errs, err) 1395 } 1396 errm.Unlock() 1397 continue 1398 } 1399 1400 newCmd := cmd.cloneBatchCommand(batchNode) 1401 go func(cmd command) { 1402 defer sem.Release(1) 1403 defer wg.Done() 1404 err := cmd.Execute() 1405 errm.Lock() 1406 if err != nil { 1407 errs = append(errs, err) 1408 } 1409 filteredOut += cmd.(batcher).filteredOut() 1410 errm.Unlock() 1411 }(newCmd) 1412 } 1413 } 1414 1415 wg.Wait() 1416 return mergeErrors(errs), filteredOut 1417} 1418 1419func (clnt *Client) getUsablePolicy(policy *BasePolicy) *BasePolicy { 1420 if policy == nil { 1421 if clnt.DefaultPolicy != nil { 1422 return clnt.DefaultPolicy 1423 } 1424 return NewPolicy() 1425 } 1426 return policy 1427} 1428 1429func (clnt *Client) getUsableBatchPolicy(policy *BatchPolicy) *BatchPolicy { 1430 if policy == nil { 1431 if clnt.DefaultBatchPolicy != nil { 1432 return clnt.DefaultBatchPolicy 1433 } 1434 return NewBatchPolicy() 1435 } 1436 return policy 1437} 1438 1439func (clnt *Client) getUsableWritePolicy(policy *WritePolicy) *WritePolicy { 1440 if policy == nil { 1441 if clnt.DefaultWritePolicy != nil { 1442 return clnt.DefaultWritePolicy 1443 } 1444 return NewWritePolicy(0, 0) 1445 } 1446 return policy 1447} 1448 1449func (clnt *Client) getUsableScanPolicy(policy *ScanPolicy) *ScanPolicy { 1450 if policy == nil { 1451 if clnt.DefaultScanPolicy != nil { 1452 return clnt.DefaultScanPolicy 1453 } 1454 return NewScanPolicy() 1455 } 1456 return policy 1457} 1458 1459func (clnt *Client) getUsableQueryPolicy(policy *QueryPolicy) *QueryPolicy { 1460 if policy == nil { 1461 if clnt.DefaultQueryPolicy != nil { 1462 return clnt.DefaultQueryPolicy 1463 } 1464 return NewQueryPolicy() 1465 } 1466 return policy 1467} 1468 1469func (clnt *Client) getUsableAdminPolicy(policy *AdminPolicy) *AdminPolicy { 1470 if policy == nil { 1471 if clnt.DefaultAdminPolicy != nil { 1472 return clnt.DefaultAdminPolicy 1473 } 1474 return NewAdminPolicy() 1475 } 1476 return policy 1477} 1478 1479//------------------------------------------------------- 1480// Utility Functions 1481//------------------------------------------------------- 1482 1483// mergeErrors merges several errors into one 1484func mergeErrors(errs []error) error { 1485 switch len(errs) { 1486 case 0: 1487 return nil 1488 case 1: 1489 return errs[0] 1490 } 1491 1492 var msg bytes.Buffer 1493 for _, err := range errs { 1494 if _, err := msg.WriteString(err.Error()); err != nil { 1495 return err 1496 } 1497 if _, err := msg.WriteString("\n"); err != nil { 1498 return err 1499 } 1500 } 1501 return errors.New(msg.String()) 1502} 1503