1// Copyright 2013-2020 Aerospike, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package aerospike
16
17import (
18	"bytes"
19	"context"
20	"encoding/base64"
21	"encoding/json"
22	"errors"
23	"fmt"
24	"io/ioutil"
25	"runtime"
26	"strconv"
27	"strings"
28	"sync"
29	"time"
30
31	"golang.org/x/sync/semaphore"
32
33	. "github.com/aerospike/aerospike-client-go/internal/atomic"
34	. "github.com/aerospike/aerospike-client-go/logger"
35	. "github.com/aerospike/aerospike-client-go/types"
36	xornd "github.com/aerospike/aerospike-client-go/types/rand"
37)
38
39// Client encapsulates an Aerospike cluster.
40// All database operations are available against this object.
41type Client struct {
42	cluster *Cluster
43
44	// DefaultPolicy is used for all read commands without a specific policy.
45	DefaultPolicy *BasePolicy
46	// DefaultBatchPolicy is used for all batch commands without a specific policy.
47	DefaultBatchPolicy *BatchPolicy
48	// DefaultWritePolicy is used for all write commands without a specific policy.
49	DefaultWritePolicy *WritePolicy
50	// DefaultScanPolicy is used for all scan commands without a specific policy.
51	DefaultScanPolicy *ScanPolicy
52	// DefaultQueryPolicy is used for all query commands without a specific policy.
53	DefaultQueryPolicy *QueryPolicy
54	// DefaultAdminPolicy is used for all security commands without a specific policy.
55	DefaultAdminPolicy *AdminPolicy
56}
57
58func clientFinalizer(f *Client) {
59	f.Close()
60}
61
62//-------------------------------------------------------
63// Constructors
64//-------------------------------------------------------
65
66// NewClient generates a new Client instance.
67func NewClient(hostname string, port int) (*Client, error) {
68	return NewClientWithPolicyAndHost(NewClientPolicy(), NewHost(hostname, port))
69}
70
71// NewClientWithPolicy generates a new Client using the specified ClientPolicy.
72// If the policy is nil, the default relevant policy will be used.
73func NewClientWithPolicy(policy *ClientPolicy, hostname string, port int) (*Client, error) {
74	return NewClientWithPolicyAndHost(policy, NewHost(hostname, port))
75}
76
77// NewClientWithPolicyAndHost generates a new Client the specified ClientPolicy and
78// sets up the cluster using the provided hosts.
79// If the policy is nil, the default relevant policy will be used.
80func NewClientWithPolicyAndHost(policy *ClientPolicy, hosts ...*Host) (*Client, error) {
81	if policy == nil {
82		policy = NewClientPolicy()
83	}
84
85	cluster, err := NewCluster(policy, hosts)
86	if err != nil && policy.FailIfNotConnected {
87		if aerr, ok := err.(AerospikeError); ok {
88			Logger.Debug("Failed to connect to host(s): %v; error: %s", hosts, err)
89			return nil, aerr
90		}
91		return nil, fmt.Errorf("Failed to connect to host(s): %v; error: %s", hosts, err)
92	}
93
94	client := &Client{
95		cluster:            cluster,
96		DefaultPolicy:      NewPolicy(),
97		DefaultBatchPolicy: NewBatchPolicy(),
98		DefaultWritePolicy: NewWritePolicy(0, 0),
99		DefaultScanPolicy:  NewScanPolicy(),
100		DefaultQueryPolicy: NewQueryPolicy(),
101		DefaultAdminPolicy: NewAdminPolicy(),
102	}
103
104	runtime.SetFinalizer(client, clientFinalizer)
105	return client, err
106
107}
108
109//-------------------------------------------------------
110// Cluster Connection Management
111//-------------------------------------------------------
112
113// Close closes all client connections to database server nodes.
114func (clnt *Client) Close() {
115	clnt.cluster.Close()
116}
117
118// IsConnected determines if the client is ready to talk to the database server cluster.
119func (clnt *Client) IsConnected() bool {
120	return clnt.cluster.IsConnected()
121}
122
123// GetNodes returns an array of active server nodes in the cluster.
124func (clnt *Client) GetNodes() []*Node {
125	return clnt.cluster.GetNodes()
126}
127
128// GetNodeNames returns a list of active server node names in the cluster.
129func (clnt *Client) GetNodeNames() []string {
130	nodes := clnt.cluster.GetNodes()
131	names := make([]string, 0, len(nodes))
132
133	for _, node := range nodes {
134		names = append(names, node.GetName())
135	}
136	return names
137}
138
139//-------------------------------------------------------
140// Write Record Operations
141//-------------------------------------------------------
142
143// Put writes record bin(s) to the server.
144// The policy specifies the transaction timeout, record expiration and how the transaction is
145// handled when the record already exists.
146// If the policy is nil, the default relevant policy will be used.
147func (clnt *Client) Put(policy *WritePolicy, key *Key, binMap BinMap) error {
148	policy = clnt.getUsableWritePolicy(policy)
149	command, err := newWriteCommand(clnt.cluster, policy, key, nil, binMap, _WRITE)
150	if err != nil {
151		return err
152	}
153
154	return command.Execute()
155}
156
157// PutBins writes record bin(s) to the server.
158// The policy specifies the transaction timeout, record expiration and how the transaction is
159// handled when the record already exists.
160// This method avoids using the BinMap allocation and iteration and is lighter on GC.
161// If the policy is nil, the default relevant policy will be used.
162func (clnt *Client) PutBins(policy *WritePolicy, key *Key, bins ...*Bin) error {
163	policy = clnt.getUsableWritePolicy(policy)
164	command, err := newWriteCommand(clnt.cluster, policy, key, bins, nil, _WRITE)
165	if err != nil {
166		return err
167	}
168
169	return command.Execute()
170}
171
172//-------------------------------------------------------
173// Operations string
174//-------------------------------------------------------
175
176// Append appends bin value's string to existing record bin values.
177// The policy specifies the transaction timeout, record expiration and how the transaction is
178// handled when the record already exists.
179// This call only works for string and []byte values.
180// If the policy is nil, the default relevant policy will be used.
181func (clnt *Client) Append(policy *WritePolicy, key *Key, binMap BinMap) error {
182	policy = clnt.getUsableWritePolicy(policy)
183	command, err := newWriteCommand(clnt.cluster, policy, key, nil, binMap, _APPEND)
184	if err != nil {
185		return err
186	}
187
188	return command.Execute()
189}
190
191// AppendBins works the same as Append, but avoids BinMap allocation and iteration.
192func (clnt *Client) AppendBins(policy *WritePolicy, key *Key, bins ...*Bin) error {
193	policy = clnt.getUsableWritePolicy(policy)
194	command, err := newWriteCommand(clnt.cluster, policy, key, bins, nil, _APPEND)
195	if err != nil {
196		return err
197	}
198
199	return command.Execute()
200}
201
202// Prepend prepends bin value's string to existing record bin values.
203// The policy specifies the transaction timeout, record expiration and how the transaction is
204// handled when the record already exists.
205// This call works only for string and []byte values.
206// If the policy is nil, the default relevant policy will be used.
207func (clnt *Client) Prepend(policy *WritePolicy, key *Key, binMap BinMap) error {
208	policy = clnt.getUsableWritePolicy(policy)
209	command, err := newWriteCommand(clnt.cluster, policy, key, nil, binMap, _PREPEND)
210	if err != nil {
211		return err
212	}
213
214	return command.Execute()
215}
216
217// PrependBins works the same as Prepend, but avoids BinMap allocation and iteration.
218func (clnt *Client) PrependBins(policy *WritePolicy, key *Key, bins ...*Bin) error {
219	policy = clnt.getUsableWritePolicy(policy)
220	command, err := newWriteCommand(clnt.cluster, policy, key, bins, nil, _PREPEND)
221	if err != nil {
222		return err
223	}
224
225	return command.Execute()
226}
227
228//-------------------------------------------------------
229// Arithmetic Operations
230//-------------------------------------------------------
231
232// Add adds integer bin values to existing record bin values.
233// The policy specifies the transaction timeout, record expiration and how the transaction is
234// handled when the record already exists.
235// This call only works for integer values.
236// If the policy is nil, the default relevant policy will be used.
237func (clnt *Client) Add(policy *WritePolicy, key *Key, binMap BinMap) error {
238	policy = clnt.getUsableWritePolicy(policy)
239	command, err := newWriteCommand(clnt.cluster, policy, key, nil, binMap, _ADD)
240	if err != nil {
241		return err
242	}
243
244	return command.Execute()
245}
246
247// AddBins works the same as Add, but avoids BinMap allocation and iteration.
248func (clnt *Client) AddBins(policy *WritePolicy, key *Key, bins ...*Bin) error {
249	policy = clnt.getUsableWritePolicy(policy)
250	command, err := newWriteCommand(clnt.cluster, policy, key, bins, nil, _ADD)
251	if err != nil {
252		return err
253	}
254
255	return command.Execute()
256}
257
258//-------------------------------------------------------
259// Delete Operations
260//-------------------------------------------------------
261
262// Delete deletes a record for specified key.
263// The policy specifies the transaction timeout.
264// If the policy is nil, the default relevant policy will be used.
265func (clnt *Client) Delete(policy *WritePolicy, key *Key) (bool, error) {
266	policy = clnt.getUsableWritePolicy(policy)
267	command, err := newDeleteCommand(clnt.cluster, policy, key)
268	if err != nil {
269		return false, err
270	}
271
272	err = command.Execute()
273	return command.Existed(), err
274}
275
276//-------------------------------------------------------
277// Touch Operations
278//-------------------------------------------------------
279
280// Touch updates a record's metadata.
281// If the record exists, the record's TTL will be reset to the
282// policy's expiration.
283// If the record doesn't exist, it will return an error.
284func (clnt *Client) Touch(policy *WritePolicy, key *Key) error {
285	policy = clnt.getUsableWritePolicy(policy)
286	command, err := newTouchCommand(clnt.cluster, policy, key)
287	if err != nil {
288		return err
289	}
290
291	return command.Execute()
292}
293
294//-------------------------------------------------------
295// Existence-Check Operations
296//-------------------------------------------------------
297
298// Exists determine if a record key exists.
299// The policy can be used to specify timeouts.
300// If the policy is nil, the default relevant policy will be used.
301func (clnt *Client) Exists(policy *BasePolicy, key *Key) (bool, error) {
302	policy = clnt.getUsablePolicy(policy)
303	command, err := newExistsCommand(clnt.cluster, policy, key)
304	if err != nil {
305		return false, err
306	}
307
308	err = command.Execute()
309	return command.Exists(), err
310}
311
312// BatchExists determines if multiple record keys exist in one batch request.
313// The returned boolean array is in positional order with the original key array order.
314// The policy can be used to specify timeouts.
315// If the policy is nil, the default relevant policy will be used.
316func (clnt *Client) BatchExists(policy *BatchPolicy, keys []*Key) ([]bool, error) {
317	policy = clnt.getUsableBatchPolicy(policy)
318
319	// same array can be used without synchronization;
320	// when a key exists, the corresponding index will be marked true
321	existsArray := make([]bool, len(keys))
322
323	batchNodes, err := newBatchNodeList(clnt.cluster, policy, keys)
324	if err != nil {
325		return nil, err
326	}
327
328	// pass nil to make sure it will be cloned and prepared
329	cmd := newBatchCommandExists(nil, nil, policy, keys, existsArray)
330	err, filteredOut := clnt.batchExecute(policy, batchNodes, cmd)
331	if err != nil {
332		return nil, err
333	}
334
335	if filteredOut > 0 {
336		err = ErrFilteredOut
337	}
338
339	return existsArray, err
340}
341
342//-------------------------------------------------------
343// Read Record Operations
344//-------------------------------------------------------
345
346// Get reads a record header and bins for specified key.
347// The policy can be used to specify timeouts.
348// If the policy is nil, the default relevant policy will be used.
349func (clnt *Client) Get(policy *BasePolicy, key *Key, binNames ...string) (*Record, error) {
350	policy = clnt.getUsablePolicy(policy)
351
352	command, err := newReadCommand(clnt.cluster, policy, key, binNames, nil)
353	if err != nil {
354		return nil, err
355	}
356
357	if err := command.Execute(); err != nil {
358		return nil, err
359	}
360	return command.GetRecord(), nil
361}
362
363// GetHeader reads a record generation and expiration only for specified key.
364// Bins are not read.
365// The policy can be used to specify timeouts.
366// If the policy is nil, the default relevant policy will be used.
367func (clnt *Client) GetHeader(policy *BasePolicy, key *Key) (*Record, error) {
368	policy = clnt.getUsablePolicy(policy)
369
370	command, err := newReadHeaderCommand(clnt.cluster, policy, key)
371	if err != nil {
372		return nil, err
373	}
374
375	if err := command.Execute(); err != nil {
376		return nil, err
377	}
378	return command.GetRecord(), nil
379}
380
381//-------------------------------------------------------
382// Batch Read Operations
383//-------------------------------------------------------
384
385// BatchGet reads multiple record headers and bins for specified keys in one batch request.
386// The returned records are in positional order with the original key array order.
387// If a key is not found, the positional record will be nil.
388// The policy can be used to specify timeouts.
389// If the policy is nil, the default relevant policy will be used.
390func (clnt *Client) BatchGet(policy *BatchPolicy, keys []*Key, binNames ...string) ([]*Record, error) {
391	policy = clnt.getUsableBatchPolicy(policy)
392
393	// same array can be used without synchronization;
394	// when a key exists, the corresponding index will be set to record
395	records := make([]*Record, len(keys))
396
397	batchNodes, err := newBatchNodeList(clnt.cluster, policy, keys)
398	if err != nil {
399		return nil, err
400	}
401
402	cmd := newBatchCommandGet(nil, nil, policy, keys, binNames, records, _INFO1_READ)
403	err, filteredOut := clnt.batchExecute(policy, batchNodes, cmd)
404	if err != nil && !policy.AllowPartialResults {
405		return nil, err
406	}
407
408	if filteredOut > 0 {
409		if err == nil {
410			err = ErrFilteredOut
411		} else {
412			mergeErrors([]error{err, ErrFilteredOut})
413		}
414	}
415
416	return records, err
417}
418
419// BatchGetComplex reads multiple records for specified batch keys in one batch call.
420// This method allows different namespaces/bins to be requested for each key in the batch.
421// The returned records are located in the same list.
422// If the BatchRead key field is not found, the corresponding record field will be null.
423// The policy can be used to specify timeouts and maximum concurrent threads.
424// This method requires Aerospike Server version >= 3.6.0.
425func (clnt *Client) BatchGetComplex(policy *BatchPolicy, records []*BatchRead) error {
426	policy = clnt.getUsableBatchPolicy(policy)
427
428	cmd := newBatchIndexCommandGet(nil, policy, records)
429
430	batchNodes, err := newBatchIndexNodeList(clnt.cluster, policy, records)
431	if err != nil {
432		return err
433	}
434
435	err, filteredOut := clnt.batchExecute(policy, batchNodes, cmd)
436	if err != nil && !policy.AllowPartialResults {
437		return err
438	}
439
440	if filteredOut > 0 {
441		if err == nil {
442			err = ErrFilteredOut
443		} else {
444			mergeErrors([]error{err, ErrFilteredOut})
445		}
446	}
447
448	return err
449}
450
451// BatchGetHeader reads multiple record header data for specified keys in one batch request.
452// The returned records are in positional order with the original key array order.
453// If a key is not found, the positional record will be nil.
454// The policy can be used to specify timeouts.
455// If the policy is nil, the default relevant policy will be used.
456func (clnt *Client) BatchGetHeader(policy *BatchPolicy, keys []*Key) ([]*Record, error) {
457	policy = clnt.getUsableBatchPolicy(policy)
458
459	// same array can be used without synchronization;
460	// when a key exists, the corresponding index will be set to record
461	records := make([]*Record, len(keys))
462
463	batchNodes, err := newBatchNodeList(clnt.cluster, policy, keys)
464	if err != nil {
465		return nil, err
466	}
467
468	cmd := newBatchCommandGet(nil, nil, policy, keys, nil, records, _INFO1_READ|_INFO1_NOBINDATA)
469	err, filteredOut := clnt.batchExecute(policy, batchNodes, cmd)
470	if err != nil && !policy.AllowPartialResults {
471		return nil, err
472	}
473
474	if filteredOut > 0 {
475		if err == nil {
476			err = ErrFilteredOut
477		} else {
478			mergeErrors([]error{err, ErrFilteredOut})
479		}
480	}
481
482	return records, err
483}
484
485//-------------------------------------------------------
486// Generic Database Operations
487//-------------------------------------------------------
488
489// Operate performs multiple read/write operations on a single key in one batch request.
490// An example would be to add an integer value to an existing record and then
491// read the result, all in one database call.
492//
493// If the policy is nil, the default relevant policy will be used.
494func (clnt *Client) Operate(policy *WritePolicy, key *Key, operations ...*Operation) (*Record, error) {
495	policy = clnt.getUsableWritePolicy(policy)
496	command, err := newOperateCommand(clnt.cluster, policy, key, operations)
497	if err != nil {
498		return nil, err
499	}
500
501	if err := command.Execute(); err != nil {
502		return nil, err
503	}
504	return command.GetRecord(), nil
505}
506
507//-------------------------------------------------------
508// Scan Operations
509//-------------------------------------------------------
510
511// ScanAll reads all records in specified namespace and set from all nodes.
512// If the policy's concurrentNodes is specified, each server node will be read in
513// parallel. Otherwise, server nodes are read sequentially.
514// If the policy is nil, the default relevant policy will be used.
515func (clnt *Client) ScanAll(apolicy *ScanPolicy, namespace string, setName string, binNames ...string) (*Recordset, error) {
516	policy := *clnt.getUsableScanPolicy(apolicy)
517
518	nodes := clnt.cluster.GetNodes()
519	if len(nodes) == 0 {
520		return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "Scan failed because cluster is empty.")
521	}
522
523	clusterKey := int64(0)
524	if policy.FailOnClusterChange {
525		var err error
526		clusterKey, err = queryValidateBegin(nodes[0], namespace)
527		if err != nil {
528			return nil, err
529		}
530	}
531
532	first := NewAtomicBool(true)
533
534	taskID := uint64(xornd.Int64())
535
536	// result recordset
537	res := newRecordset(policy.RecordQueueSize, len(nodes), taskID)
538
539	// the whole call should be wrapped in a goroutine
540	if policy.ConcurrentNodes {
541		for _, node := range nodes {
542			go func(node *Node, first bool) {
543				clnt.scanNode(&policy, node, res, namespace, setName, taskID, clusterKey, first, binNames...)
544			}(node, first.CompareAndToggle(true))
545		}
546	} else {
547		// scan nodes one by one
548		go func() {
549			for _, node := range nodes {
550				clnt.scanNode(&policy, node, res, namespace, setName, taskID, clusterKey, first.CompareAndToggle(true), binNames...)
551			}
552		}()
553	}
554
555	return res, nil
556}
557
558// ScanNode reads all records in specified namespace and set for one node only.
559// If the policy is nil, the default relevant policy will be used.
560func (clnt *Client) ScanNode(apolicy *ScanPolicy, node *Node, namespace string, setName string, binNames ...string) (*Recordset, error) {
561	policy := *clnt.getUsableScanPolicy(apolicy)
562
563	clusterKey := int64(0)
564	if policy.FailOnClusterChange {
565		var err error
566		clusterKey, err = queryValidateBegin(node, namespace)
567		if err != nil {
568			return nil, err
569		}
570	}
571
572	// results channel must be async for performance
573	taskID := uint64(xornd.Int64())
574	res := newRecordset(policy.RecordQueueSize, 1, taskID)
575
576	go clnt.scanNode(&policy, node, res, namespace, setName, taskID, clusterKey, true, binNames...)
577	return res, nil
578}
579
580// ScanNode reads all records in specified namespace and set for one node only.
581// If the policy is nil, the default relevant policy will be used.
582func (clnt *Client) scanNode(policy *ScanPolicy, node *Node, recordset *Recordset, namespace string, setName string, taskID uint64, clusterKey int64, first bool, binNames ...string) error {
583	command := newScanCommand(node, policy, namespace, setName, binNames, recordset, taskID, clusterKey, first)
584	return command.Execute()
585}
586
587//---------------------------------------------------------------
588// User defined functions (Supported by Aerospike 3+ servers only)
589//---------------------------------------------------------------
590
591// RegisterUDFFromFile reads a file from file system and registers
592// the containing a package user defined functions with the server.
593// This asynchronous server call will return before command is complete.
594// The user can optionally wait for command completion by using the returned
595// RegisterTask instance.
596//
597// This method is only supported by Aerospike 3+ servers.
598// If the policy is nil, the default relevant policy will be used.
599func (clnt *Client) RegisterUDFFromFile(policy *WritePolicy, clientPath string, serverPath string, language Language) (*RegisterTask, error) {
600	policy = clnt.getUsableWritePolicy(policy)
601	udfBody, err := ioutil.ReadFile(clientPath)
602	if err != nil {
603		return nil, err
604	}
605
606	return clnt.RegisterUDF(policy, udfBody, serverPath, language)
607}
608
609// RegisterUDF registers a package containing user defined functions with server.
610// This asynchronous server call will return before command is complete.
611// The user can optionally wait for command completion by using the returned
612// RegisterTask instance.
613//
614// This method is only supported by Aerospike 3+ servers.
615// If the policy is nil, the default relevant policy will be used.
616func (clnt *Client) RegisterUDF(policy *WritePolicy, udfBody []byte, serverPath string, language Language) (*RegisterTask, error) {
617	policy = clnt.getUsableWritePolicy(policy)
618	content := base64.StdEncoding.EncodeToString(udfBody)
619
620	var strCmd bytes.Buffer
621	// errors are to remove errcheck warnings
622	// they will always be nil as stated in golang docs
623	_, err := strCmd.WriteString("udf-put:filename=")
624	_, err = strCmd.WriteString(serverPath)
625	_, err = strCmd.WriteString(";content=")
626	_, err = strCmd.WriteString(content)
627	_, err = strCmd.WriteString(";content-len=")
628	_, err = strCmd.WriteString(strconv.Itoa(len(content)))
629	_, err = strCmd.WriteString(";udf-type=")
630	_, err = strCmd.WriteString(string(language))
631	_, err = strCmd.WriteString(";")
632
633	// Send UDF to one node. That node will distribute the UDF to other nodes.
634	responseMap, err := clnt.sendInfoCommand(policy.TotalTimeout, strCmd.String())
635	if err != nil {
636		return nil, err
637	}
638
639	response := responseMap[strCmd.String()]
640	res := make(map[string]string)
641	vals := strings.Split(response, ";")
642	for _, pair := range vals {
643		t := strings.SplitN(pair, "=", 2)
644		if len(t) == 2 {
645			res[t[0]] = t[1]
646		} else if len(t) == 1 {
647			res[t[0]] = ""
648		}
649	}
650
651	if _, exists := res["error"]; exists {
652		msg, _ := base64.StdEncoding.DecodeString(res["message"])
653		return nil, NewAerospikeError(COMMAND_REJECTED, fmt.Sprintf("Registration failed: %s\nFile: %s\nLine: %s\nMessage: %s",
654			res["error"], res["file"], res["line"], msg))
655	}
656	return NewRegisterTask(clnt.cluster, serverPath), nil
657}
658
659// RemoveUDF removes a package containing user defined functions in the server.
660// This asynchronous server call will return before command is complete.
661// The user can optionally wait for command completion by using the returned
662// RemoveTask instance.
663//
664// This method is only supported by Aerospike 3+ servers.
665// If the policy is nil, the default relevant policy will be used.
666func (clnt *Client) RemoveUDF(policy *WritePolicy, udfName string) (*RemoveTask, error) {
667	policy = clnt.getUsableWritePolicy(policy)
668	var strCmd bytes.Buffer
669	// errors are to remove errcheck warnings
670	// they will always be nil as stated in golang docs
671	_, err := strCmd.WriteString("udf-remove:filename=")
672	_, err = strCmd.WriteString(udfName)
673	_, err = strCmd.WriteString(";")
674
675	// Send command to one node. That node will distribute it to other nodes.
676	responseMap, err := clnt.sendInfoCommand(policy.TotalTimeout, strCmd.String())
677	if err != nil {
678		return nil, err
679	}
680
681	response := responseMap[strCmd.String()]
682	if response == "ok" {
683		return NewRemoveTask(clnt.cluster, udfName), nil
684	}
685	return nil, NewAerospikeError(SERVER_ERROR, response)
686}
687
688// ListUDF lists all packages containing user defined functions in the server.
689// This method is only supported by Aerospike 3+ servers.
690// If the policy is nil, the default relevant policy will be used.
691func (clnt *Client) ListUDF(policy *BasePolicy) ([]*UDF, error) {
692	policy = clnt.getUsablePolicy(policy)
693
694	var strCmd bytes.Buffer
695	// errors are to remove errcheck warnings
696	// they will always be nil as stated in golang docs
697	_, err := strCmd.WriteString("udf-list")
698
699	// Send command to one node. That node will distribute it to other nodes.
700	responseMap, err := clnt.sendInfoCommand(policy.TotalTimeout, strCmd.String())
701	if err != nil {
702		return nil, err
703	}
704
705	response := responseMap[strCmd.String()]
706	vals := strings.Split(response, ";")
707	res := make([]*UDF, 0, len(vals))
708
709	for _, udfInfo := range vals {
710		if strings.Trim(udfInfo, " ") == "" {
711			continue
712		}
713		udfParts := strings.Split(udfInfo, ",")
714
715		udf := &UDF{}
716		for _, values := range udfParts {
717			valueParts := strings.Split(values, "=")
718			if len(valueParts) == 2 {
719				switch valueParts[0] {
720				case "filename":
721					udf.Filename = valueParts[1]
722				case "hash":
723					udf.Hash = valueParts[1]
724				case "type":
725					udf.Language = Language(valueParts[1])
726				}
727			}
728		}
729		res = append(res, udf)
730	}
731
732	return res, nil
733}
734
735// Execute executes a user defined function on server and return results.
736// The function operates on a single record.
737// The package name is used to locate the udf file location:
738//
739// udf file = <server udf dir>/<package name>.lua
740//
741// This method is only supported by Aerospike 3+ servers.
742// If the policy is nil, the default relevant policy will be used.
743func (clnt *Client) Execute(policy *WritePolicy, key *Key, packageName string, functionName string, args ...Value) (interface{}, error) {
744	policy = clnt.getUsableWritePolicy(policy)
745	command, err := newExecuteCommand(clnt.cluster, policy, key, packageName, functionName, NewValueArray(args))
746	if err != nil {
747		return nil, err
748	}
749
750	if err := command.Execute(); err != nil {
751		return nil, err
752	}
753
754	record := command.GetRecord()
755
756	if record == nil || len(record.Bins) == 0 {
757		return nil, nil
758	}
759
760	for k, v := range record.Bins {
761		if strings.Contains(k, "SUCCESS") {
762			return v, nil
763		} else if strings.Contains(k, "FAILURE") {
764			return nil, fmt.Errorf("%v", v)
765		}
766	}
767
768	return nil, ErrUDFBadResponse
769}
770
771//----------------------------------------------------------
772// Query/Execute (Supported by Aerospike 3+ servers only)
773//----------------------------------------------------------
774
775// QueryExecute applies operations on records that match the statement filter.
776// Records are not returned to the client.
777// This asynchronous server call will return before the command is complete.
778// The user can optionally wait for command completion by using the returned
779// ExecuteTask instance.
780//
781// This method is only supported by Aerospike 3+ servers.
782// If the policy is nil, the default relevant policy will be used.
783func (clnt *Client) QueryExecute(policy *QueryPolicy,
784	writePolicy *WritePolicy,
785	statement *Statement,
786	ops ...*Operation,
787) (*ExecuteTask, error) {
788
789	if len(ops) == 0 {
790		return nil, ErrNoOperationsSpecified
791	}
792
793	if len(statement.BinNames) > 0 {
794		return nil, ErrNoBinNamesAlloedInQueryExecute
795	}
796
797	policy = clnt.getUsableQueryPolicy(policy)
798	writePolicy = clnt.getUsableWritePolicy(writePolicy)
799
800	nodes := clnt.cluster.GetNodes()
801	if len(nodes) == 0 {
802		return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "ExecuteOperations failed because cluster is empty.")
803	}
804
805	statement.prepare(false)
806
807	errs := []error{}
808	for i := range nodes {
809		command := newServerCommand(nodes[i], policy, writePolicy, statement, ops)
810		if err := command.Execute(); err != nil {
811			errs = append(errs, err)
812		}
813	}
814
815	return NewExecuteTask(clnt.cluster, statement), mergeErrors(errs)
816}
817
818// ExecuteUDF applies user defined function on records that match the statement filter.
819// Records are not returned to the client.
820// This asynchronous server call will return before command is complete.
821// The user can optionally wait for command completion by using the returned
822// ExecuteTask instance.
823//
824// This method is only supported by Aerospike 3+ servers.
825// If the policy is nil, the default relevant policy will be used.
826func (clnt *Client) ExecuteUDF(policy *QueryPolicy,
827	statement *Statement,
828	packageName string,
829	functionName string,
830	functionArgs ...Value,
831) (*ExecuteTask, error) {
832	policy = clnt.getUsableQueryPolicy(policy)
833
834	nodes := clnt.cluster.GetNodes()
835	if len(nodes) == 0 {
836		return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "ExecuteUDF failed because cluster is empty.")
837	}
838
839	statement.SetAggregateFunction(packageName, functionName, functionArgs, false)
840
841	errs := []error{}
842	for i := range nodes {
843		command := newServerCommand(nodes[i], policy, nil, statement, nil)
844		if err := command.Execute(); err != nil {
845			errs = append(errs, err)
846		}
847	}
848
849	return NewExecuteTask(clnt.cluster, statement), mergeErrors(errs)
850}
851
852// ExecuteUDFNode applies user defined function on records that match the statement filter on the specified node.
853// Records are not returned to the client.
854// This asynchronous server call will return before command is complete.
855// The user can optionally wait for command completion by using the returned
856// ExecuteTask instance.
857//
858// This method is only supported by Aerospike 3+ servers.
859// If the policy is nil, the default relevant policy will be used.
860func (clnt *Client) ExecuteUDFNode(policy *QueryPolicy,
861	node *Node,
862	statement *Statement,
863	packageName string,
864	functionName string,
865	functionArgs ...Value,
866) (*ExecuteTask, error) {
867	policy = clnt.getUsableQueryPolicy(policy)
868
869	if node == nil {
870		return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "ExecuteUDFNode failed because node is nil.")
871	}
872
873	statement.SetAggregateFunction(packageName, functionName, functionArgs, false)
874
875	command := newServerCommand(node, policy, nil, statement, nil)
876	err := command.Execute()
877
878	return NewExecuteTask(clnt.cluster, statement), err
879}
880
881//--------------------------------------------------------
882// Query functions (Supported by Aerospike 3+ servers only)
883//--------------------------------------------------------
884
885// Query executes a query and returns a Recordset.
886// The query executor puts records on the channel from separate goroutines.
887// The caller can concurrently pop records off the channel through the
888// Recordset.Records channel.
889//
890// This method is only supported by Aerospike 3+ servers.
891// If the policy is nil, the default relevant policy will be used.
892func (clnt *Client) Query(policy *QueryPolicy, statement *Statement) (*Recordset, error) {
893	policy = clnt.getUsableQueryPolicy(policy)
894
895	nodes := clnt.cluster.GetNodes()
896	if len(nodes) == 0 {
897		return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "Query failed because cluster is empty.")
898	}
899
900	clusterKey := int64(0)
901	if policy.FailOnClusterChange {
902		var err error
903		clusterKey, err = queryValidateBegin(nodes[0], statement.Namespace)
904		if err != nil {
905			return nil, err
906		}
907	}
908
909	first := NewAtomicBool(true)
910
911	// results channel must be async for performance
912	recSet := newRecordset(policy.RecordQueueSize, len(nodes), statement.TaskId)
913
914	// results channel must be async for performance
915	for _, node := range nodes {
916		// copy policies to avoid race conditions
917		newPolicy := *policy
918		command := newQueryRecordCommand(node, &newPolicy, statement, recSet, clusterKey, first.CompareAndToggle(true))
919		go func() {
920			command.Execute()
921		}()
922	}
923
924	return recSet, nil
925}
926
927// QueryNode executes a query on a specific node and returns a recordset.
928// The caller can concurrently pop records off the channel through the
929// record channel.
930//
931// This method is only supported by Aerospike 3+ servers.
932// If the policy is nil, the default relevant policy will be used.
933func (clnt *Client) QueryNode(policy *QueryPolicy, node *Node, statement *Statement) (*Recordset, error) {
934	policy = clnt.getUsableQueryPolicy(policy)
935
936	// results channel must be async for performance
937	recSet := newRecordset(policy.RecordQueueSize, 1, statement.TaskId)
938
939	clusterKey := int64(0)
940	if policy.FailOnClusterChange {
941		var err error
942		clusterKey, err = queryValidateBegin(node, statement.Namespace)
943		if err != nil {
944			return nil, err
945		}
946	}
947
948	// copy policies to avoid race conditions
949	newPolicy := *policy
950	command := newQueryRecordCommand(node, &newPolicy, statement, recSet, clusterKey, true)
951	go func() {
952		command.Execute()
953	}()
954
955	return recSet, nil
956}
957
958// CreateIndex creates a secondary index.
959// This asynchronous server call will return before the command is complete.
960// The user can optionally wait for command completion by using the returned
961// IndexTask instance.
962// This method is only supported by Aerospike 3+ servers.
963// If the policy is nil, the default relevant policy will be used.
964func (clnt *Client) CreateIndex(
965	policy *WritePolicy,
966	namespace string,
967	setName string,
968	indexName string,
969	binName string,
970	indexType IndexType,
971) (*IndexTask, error) {
972	policy = clnt.getUsableWritePolicy(policy)
973	return clnt.CreateComplexIndex(policy, namespace, setName, indexName, binName, indexType, ICT_DEFAULT)
974}
975
976// CreateComplexIndex creates a secondary index, with the ability to put indexes
977// on bin containing complex data types, e.g: Maps and Lists.
978// This asynchronous server call will return before the command is complete.
979// The user can optionally wait for command completion by using the returned
980// IndexTask instance.
981// This method is only supported by Aerospike 3+ servers.
982// If the policy is nil, the default relevant policy will be used.
983func (clnt *Client) CreateComplexIndex(
984	policy *WritePolicy,
985	namespace string,
986	setName string,
987	indexName string,
988	binName string,
989	indexType IndexType,
990	indexCollectionType IndexCollectionType,
991) (*IndexTask, error) {
992	policy = clnt.getUsableWritePolicy(policy)
993
994	var strCmd bytes.Buffer
995	_, err := strCmd.WriteString("sindex-create:ns=")
996	_, err = strCmd.WriteString(namespace)
997
998	if len(setName) > 0 {
999		_, err = strCmd.WriteString(";set=")
1000		_, err = strCmd.WriteString(setName)
1001	}
1002
1003	_, err = strCmd.WriteString(";indexname=")
1004	_, err = strCmd.WriteString(indexName)
1005	_, err = strCmd.WriteString(";numbins=1")
1006
1007	if indexCollectionType != ICT_DEFAULT {
1008		_, err = strCmd.WriteString(";indextype=")
1009		_, err = strCmd.WriteString(ictToString(indexCollectionType))
1010	}
1011
1012	_, err = strCmd.WriteString(";indexdata=")
1013	_, err = strCmd.WriteString(binName)
1014	_, err = strCmd.WriteString(",")
1015	_, err = strCmd.WriteString(string(indexType))
1016	_, err = strCmd.WriteString(";priority=normal")
1017
1018	// Send index command to one node. That node will distribute the command to other nodes.
1019	responseMap, err := clnt.sendInfoCommand(policy.TotalTimeout, strCmd.String())
1020	if err != nil {
1021		return nil, err
1022	}
1023
1024	response := responseMap[strCmd.String()]
1025	if strings.EqualFold(response, "OK") {
1026		// Return task that could optionally be polled for completion.
1027		return NewIndexTask(clnt.cluster, namespace, indexName), nil
1028	}
1029
1030	if strings.HasPrefix(response, "FAIL:200") {
1031		// Index has already been created.  Do not need to poll for completion.
1032		return nil, NewAerospikeError(INDEX_FOUND)
1033	}
1034
1035	return nil, NewAerospikeError(INDEX_GENERIC, "Create index failed: "+response)
1036}
1037
1038// DropIndex deletes a secondary index. It will block until index is dropped on all nodes.
1039// This method is only supported by Aerospike 3+ servers.
1040// If the policy is nil, the default relevant policy will be used.
1041func (clnt *Client) DropIndex(
1042	policy *WritePolicy,
1043	namespace string,
1044	setName string,
1045	indexName string,
1046) error {
1047	policy = clnt.getUsableWritePolicy(policy)
1048	var strCmd bytes.Buffer
1049	_, err := strCmd.WriteString("sindex-delete:ns=")
1050	_, err = strCmd.WriteString(namespace)
1051
1052	if len(setName) > 0 {
1053		_, err = strCmd.WriteString(";set=")
1054		_, err = strCmd.WriteString(setName)
1055	}
1056	_, err = strCmd.WriteString(";indexname=")
1057	_, err = strCmd.WriteString(indexName)
1058
1059	// Send index command to one node. That node will distribute the command to other nodes.
1060	responseMap, err := clnt.sendInfoCommand(policy.TotalTimeout, strCmd.String())
1061	if err != nil {
1062		return err
1063	}
1064
1065	response := responseMap[strCmd.String()]
1066
1067	if strings.EqualFold(response, "OK") {
1068		// Return task that could optionally be polled for completion.
1069		task := NewDropIndexTask(clnt.cluster, namespace, indexName)
1070		return <-task.OnComplete()
1071	}
1072
1073	if strings.HasPrefix(response, "FAIL:201") {
1074		// Index did not previously exist. Return without error.
1075		return nil
1076	}
1077
1078	return NewAerospikeError(INDEX_GENERIC, "Drop index failed: "+response)
1079}
1080
1081// Truncate removes records in specified namespace/set efficiently.  This method is many orders of magnitude
1082// faster than deleting records one at a time.  Works with Aerospike Server versions >= 3.12.
1083// This asynchronous server call may return before the truncation is complete.  The user can still
1084// write new records after the server call returns because new records will have last update times
1085// greater than the truncate cutoff (set at the time of truncate call).
1086// For more information, See https://www.aerospike.com/docs/reference/info#truncate
1087func (clnt *Client) Truncate(policy *WritePolicy, namespace, set string, beforeLastUpdate *time.Time) error {
1088	policy = clnt.getUsableWritePolicy(policy)
1089
1090	node, err := clnt.cluster.GetRandomNode()
1091	if err != nil {
1092		return err
1093	}
1094
1095	node.tendConnLock.Lock()
1096	defer node.tendConnLock.Unlock()
1097
1098	if err := node.initTendConn(policy.TotalTimeout); err != nil {
1099		return err
1100	}
1101
1102	var strCmd bytes.Buffer
1103	if len(set) > 0 {
1104		_, err = strCmd.WriteString("truncate:namespace=")
1105		_, err = strCmd.WriteString(namespace)
1106		_, err = strCmd.WriteString(";set=")
1107		_, err = strCmd.WriteString(set)
1108	} else {
1109		// Servers >= 4.5.1.0 support truncate-namespace.
1110		if node.supportsTruncateNamespace.Get() {
1111			_, err = strCmd.WriteString("truncate-namespace:namespace=")
1112			_, err = strCmd.WriteString(namespace)
1113		} else {
1114			_, err = strCmd.WriteString("truncate:namespace=")
1115			_, err = strCmd.WriteString(namespace)
1116		}
1117	}
1118	if beforeLastUpdate != nil {
1119		_, err = strCmd.WriteString(";lut=")
1120		_, err = strCmd.WriteString(strconv.FormatInt(beforeLastUpdate.UnixNano(), 10))
1121	} else {
1122		// Servers >= 4.3.1.4 and <= 4.5.0.1 require lut argument.
1123		if node.supportsLUTNow.Get() {
1124			_, err = strCmd.WriteString(";lut=now")
1125		}
1126	}
1127
1128	responseMap, err := RequestInfo(node.tendConn, strCmd.String())
1129	if err != nil {
1130		node.tendConn.Close()
1131		return err
1132	}
1133
1134	response := responseMap[strCmd.String()]
1135	if strings.EqualFold(response, "OK") {
1136		return nil
1137	}
1138
1139	return NewAerospikeError(SERVER_ERROR, "Truncate failed: "+response)
1140}
1141
1142//-------------------------------------------------------
1143// User administration
1144//-------------------------------------------------------
1145
1146// CreateUser creates a new user with password and roles. Clear-text password will be hashed using bcrypt
1147// before sending to server.
1148func (clnt *Client) CreateUser(policy *AdminPolicy, user string, password string, roles []string) error {
1149	policy = clnt.getUsableAdminPolicy(policy)
1150
1151	hash, err := hashPassword(password)
1152	if err != nil {
1153		return err
1154	}
1155	command := newAdminCommand(nil)
1156	return command.createUser(clnt.cluster, policy, user, hash, roles)
1157}
1158
1159// DropUser removes a user from the cluster.
1160func (clnt *Client) DropUser(policy *AdminPolicy, user string) error {
1161	policy = clnt.getUsableAdminPolicy(policy)
1162
1163	command := newAdminCommand(nil)
1164	return command.dropUser(clnt.cluster, policy, user)
1165}
1166
1167// ChangePassword changes a user's password. Clear-text password will be hashed using bcrypt before sending to server.
1168func (clnt *Client) ChangePassword(policy *AdminPolicy, user string, password string) error {
1169	policy = clnt.getUsableAdminPolicy(policy)
1170
1171	if clnt.cluster.user == "" {
1172		return NewAerospikeError(INVALID_USER)
1173	}
1174
1175	hash, err := hashPassword(password)
1176	if err != nil {
1177		return err
1178	}
1179	command := newAdminCommand(nil)
1180
1181	if user == clnt.cluster.user {
1182		// Change own password.
1183		if err := command.changePassword(clnt.cluster, policy, user, hash); err != nil {
1184			return err
1185		}
1186	} else {
1187		// Change other user's password by user admin.
1188		if err := command.setPassword(clnt.cluster, policy, user, hash); err != nil {
1189			return err
1190		}
1191	}
1192
1193	clnt.cluster.changePassword(user, password, hash)
1194
1195	return nil
1196}
1197
1198// GrantRoles adds roles to user's list of roles.
1199func (clnt *Client) GrantRoles(policy *AdminPolicy, user string, roles []string) error {
1200	policy = clnt.getUsableAdminPolicy(policy)
1201
1202	command := newAdminCommand(nil)
1203	return command.grantRoles(clnt.cluster, policy, user, roles)
1204}
1205
1206// RevokeRoles removes roles from user's list of roles.
1207func (clnt *Client) RevokeRoles(policy *AdminPolicy, user string, roles []string) error {
1208	policy = clnt.getUsableAdminPolicy(policy)
1209
1210	command := newAdminCommand(nil)
1211	return command.revokeRoles(clnt.cluster, policy, user, roles)
1212}
1213
1214// QueryUser retrieves roles for a given user.
1215func (clnt *Client) QueryUser(policy *AdminPolicy, user string) (*UserRoles, error) {
1216	policy = clnt.getUsableAdminPolicy(policy)
1217
1218	command := newAdminCommand(nil)
1219	return command.queryUser(clnt.cluster, policy, user)
1220}
1221
1222// QueryUsers retrieves all users and their roles.
1223func (clnt *Client) QueryUsers(policy *AdminPolicy) ([]*UserRoles, error) {
1224	policy = clnt.getUsableAdminPolicy(policy)
1225
1226	command := newAdminCommand(nil)
1227	return command.queryUsers(clnt.cluster, policy)
1228}
1229
1230// QueryRole retrieves privileges for a given role.
1231func (clnt *Client) QueryRole(policy *AdminPolicy, role string) (*Role, error) {
1232	policy = clnt.getUsableAdminPolicy(policy)
1233
1234	command := newAdminCommand(nil)
1235	return command.queryRole(clnt.cluster, policy, role)
1236}
1237
1238// QueryRoles retrieves all roles and their privileges.
1239func (clnt *Client) QueryRoles(policy *AdminPolicy) ([]*Role, error) {
1240	policy = clnt.getUsableAdminPolicy(policy)
1241
1242	command := newAdminCommand(nil)
1243	return command.queryRoles(clnt.cluster, policy)
1244}
1245
1246// CreateRole creates a user-defined role.
1247func (clnt *Client) CreateRole(policy *AdminPolicy, roleName string, privileges []Privilege, whitelist []string) error {
1248	policy = clnt.getUsableAdminPolicy(policy)
1249
1250	command := newAdminCommand(nil)
1251	return command.createRole(clnt.cluster, policy, roleName, privileges, whitelist)
1252}
1253
1254// DropRole removes a user-defined role.
1255func (clnt *Client) DropRole(policy *AdminPolicy, roleName string) error {
1256	policy = clnt.getUsableAdminPolicy(policy)
1257
1258	command := newAdminCommand(nil)
1259	return command.dropRole(clnt.cluster, policy, roleName)
1260}
1261
1262// GrantPrivileges grant privileges to a user-defined role.
1263func (clnt *Client) GrantPrivileges(policy *AdminPolicy, roleName string, privileges []Privilege) error {
1264	policy = clnt.getUsableAdminPolicy(policy)
1265
1266	command := newAdminCommand(nil)
1267	return command.grantPrivileges(clnt.cluster, policy, roleName, privileges)
1268}
1269
1270// RevokePrivileges revokes privileges from a user-defined role.
1271func (clnt *Client) RevokePrivileges(policy *AdminPolicy, roleName string, privileges []Privilege) error {
1272	policy = clnt.getUsableAdminPolicy(policy)
1273
1274	command := newAdminCommand(nil)
1275	return command.revokePrivileges(clnt.cluster, policy, roleName, privileges)
1276}
1277
1278// SetWhitelist sets IP address whitelist for a role. If whitelist is nil or empty, it removes existing whitelist from role.
1279func (clnt *Client) SetWhitelist(policy *AdminPolicy, roleName string, whitelist []string) error {
1280	policy = clnt.getUsableAdminPolicy(policy)
1281
1282	command := newAdminCommand(nil)
1283	return command.setWhitelist(clnt.cluster, policy, roleName, whitelist)
1284}
1285
1286// Cluster exposes the cluster object to the user
1287func (clnt *Client) Cluster() *Cluster {
1288	return clnt.cluster
1289}
1290
1291// String implements the Stringer interface for client
1292func (clnt *Client) String() string {
1293	if clnt.cluster != nil {
1294		return clnt.cluster.String()
1295	}
1296	return ""
1297}
1298
1299// Stats returns internal statistics regarding the inner state of the client and the cluster.
1300func (clnt *Client) Stats() (map[string]interface{}, error) {
1301	resStats := clnt.cluster.statsCopy()
1302
1303	clusterStats := nodeStats{}
1304	for _, stats := range resStats {
1305		clusterStats.aggregate(&stats)
1306	}
1307
1308	resStats["cluster-aggregated-stats"] = clusterStats
1309
1310	b, err := json.Marshal(resStats)
1311	if err != nil {
1312		return nil, err
1313	}
1314
1315	res := map[string]interface{}{}
1316	err = json.Unmarshal(b, &res)
1317	if err != nil {
1318		return nil, err
1319	}
1320
1321	res["open-connections"] = clusterStats.ConnectionsOpen
1322
1323	return res, nil
1324}
1325
1326// WarmUp fills the connection pool with connections for all nodes.
1327// This is necessary on startup for high traffic programs.
1328// If the count is <= 0, the connection queue will be filled.
1329// If the count is more than the size of the pool, the pool will be filled.
1330// Note: One connection per node is reserved for tend operations and is not used for transactions.
1331func (clnt *Client) WarmUp(count int) (int, error) {
1332	return clnt.cluster.WarmUp(count)
1333}
1334
1335//-------------------------------------------------------
1336// Internal Methods
1337//-------------------------------------------------------
1338
1339func (clnt *Client) sendInfoCommand(timeout time.Duration, command string) (map[string]string, error) {
1340	node, err := clnt.cluster.GetRandomNode()
1341	if err != nil {
1342		return nil, err
1343	}
1344
1345	node.tendConnLock.Lock()
1346	defer node.tendConnLock.Unlock()
1347
1348	if err := node.initTendConn(timeout); err != nil {
1349		return nil, err
1350	}
1351
1352	results, err := RequestInfo(node.tendConn, command)
1353	if err != nil {
1354		node.tendConn.Close()
1355		return nil, err
1356	}
1357
1358	return results, nil
1359}
1360
1361// batchExecute Uses sync.WaitGroup to run commands using multiple goroutines,
1362// and waits for their return
1363func (clnt *Client) batchExecute(policy *BatchPolicy, batchNodes []*batchNode, cmd batcher) (error, int) {
1364	var wg sync.WaitGroup
1365	filteredOut := 0
1366
1367	// Use a goroutine per namespace per node
1368	errs := []error{}
1369	errm := new(sync.Mutex)
1370
1371	wg.Add(len(batchNodes))
1372	if policy.ConcurrentNodes <= 0 {
1373		for _, batchNode := range batchNodes {
1374			newCmd := cmd.cloneBatchCommand(batchNode)
1375			go func(cmd command) {
1376				defer wg.Done()
1377				err := cmd.Execute()
1378				errm.Lock()
1379				if err != nil {
1380					errs = append(errs, err)
1381				}
1382				filteredOut += cmd.(batcher).filteredOut()
1383				errm.Unlock()
1384			}(newCmd)
1385		}
1386	} else {
1387		sem := semaphore.NewWeighted(int64(policy.ConcurrentNodes))
1388		ctx := context.Background()
1389
1390		for _, batchNode := range batchNodes {
1391			if err := sem.Acquire(ctx, 1); err != nil {
1392				errm.Lock()
1393				if err != nil {
1394					errs = append(errs, err)
1395				}
1396				errm.Unlock()
1397				continue
1398			}
1399
1400			newCmd := cmd.cloneBatchCommand(batchNode)
1401			go func(cmd command) {
1402				defer sem.Release(1)
1403				defer wg.Done()
1404				err := cmd.Execute()
1405				errm.Lock()
1406				if err != nil {
1407					errs = append(errs, err)
1408				}
1409				filteredOut += cmd.(batcher).filteredOut()
1410				errm.Unlock()
1411			}(newCmd)
1412		}
1413	}
1414
1415	wg.Wait()
1416	return mergeErrors(errs), filteredOut
1417}
1418
1419func (clnt *Client) getUsablePolicy(policy *BasePolicy) *BasePolicy {
1420	if policy == nil {
1421		if clnt.DefaultPolicy != nil {
1422			return clnt.DefaultPolicy
1423		}
1424		return NewPolicy()
1425	}
1426	return policy
1427}
1428
1429func (clnt *Client) getUsableBatchPolicy(policy *BatchPolicy) *BatchPolicy {
1430	if policy == nil {
1431		if clnt.DefaultBatchPolicy != nil {
1432			return clnt.DefaultBatchPolicy
1433		}
1434		return NewBatchPolicy()
1435	}
1436	return policy
1437}
1438
1439func (clnt *Client) getUsableWritePolicy(policy *WritePolicy) *WritePolicy {
1440	if policy == nil {
1441		if clnt.DefaultWritePolicy != nil {
1442			return clnt.DefaultWritePolicy
1443		}
1444		return NewWritePolicy(0, 0)
1445	}
1446	return policy
1447}
1448
1449func (clnt *Client) getUsableScanPolicy(policy *ScanPolicy) *ScanPolicy {
1450	if policy == nil {
1451		if clnt.DefaultScanPolicy != nil {
1452			return clnt.DefaultScanPolicy
1453		}
1454		return NewScanPolicy()
1455	}
1456	return policy
1457}
1458
1459func (clnt *Client) getUsableQueryPolicy(policy *QueryPolicy) *QueryPolicy {
1460	if policy == nil {
1461		if clnt.DefaultQueryPolicy != nil {
1462			return clnt.DefaultQueryPolicy
1463		}
1464		return NewQueryPolicy()
1465	}
1466	return policy
1467}
1468
1469func (clnt *Client) getUsableAdminPolicy(policy *AdminPolicy) *AdminPolicy {
1470	if policy == nil {
1471		if clnt.DefaultAdminPolicy != nil {
1472			return clnt.DefaultAdminPolicy
1473		}
1474		return NewAdminPolicy()
1475	}
1476	return policy
1477}
1478
1479//-------------------------------------------------------
1480// Utility Functions
1481//-------------------------------------------------------
1482
1483// mergeErrors merges several errors into one
1484func mergeErrors(errs []error) error {
1485	switch len(errs) {
1486	case 0:
1487		return nil
1488	case 1:
1489		return errs[0]
1490	}
1491
1492	var msg bytes.Buffer
1493	for _, err := range errs {
1494		if _, err := msg.WriteString(err.Error()); err != nil {
1495			return err
1496		}
1497		if _, err := msg.WriteString("\n"); err != nil {
1498			return err
1499		}
1500	}
1501	return errors.New(msg.String())
1502}
1503