1// Copyright 2013-2020 Aerospike, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package aerospike
16
17import (
18	"bytes"
19	"compress/zlib"
20	"encoding/binary"
21	"fmt"
22	"io"
23	"math"
24	"net"
25	"time"
26
27	. "github.com/aerospike/aerospike-client-go/logger"
28	. "github.com/aerospike/aerospike-client-go/types"
29
30	ParticleType "github.com/aerospike/aerospike-client-go/internal/particle_type"
31	Buffer "github.com/aerospike/aerospike-client-go/utils/buffer"
32)
33
34const (
35	// Flags commented out are not supported by cmd client.
36	// Contains a read operation.
37	_INFO1_READ int = (1 << 0)
38	// Get all bins.
39	_INFO1_GET_ALL int = (1 << 1)
40	// Batch read or exists.
41	_INFO1_BATCH int = (1 << 3)
42
43	// Do not read the bins
44	_INFO1_NOBINDATA int = (1 << 5)
45
46	// Involve all replicas in read operation.
47	_INFO1_READ_MODE_AP_ALL = (1 << 6)
48
49	// Tell server to compress its response.
50	_INFO1_COMPRESS_RESPONSE = (1 << 7)
51
52	// Create or update record
53	_INFO2_WRITE int = (1 << 0)
54	// Fling a record into the belly of Moloch.
55	_INFO2_DELETE int = (1 << 1)
56	// Update if expected generation == old.
57	_INFO2_GENERATION int = (1 << 2)
58	// Update if new generation >= old, good for restore.
59	_INFO2_GENERATION_GT int = (1 << 3)
60	// Transaction resulting in record deletion leaves tombstone (Enterprise only).
61	_INFO2_DURABLE_DELETE int = (1 << 4)
62	// Create only. Fail if record already exists.
63	_INFO2_CREATE_ONLY int = (1 << 5)
64
65	// Return a result for every operation.
66	_INFO2_RESPOND_ALL_OPS int = (1 << 7)
67
68	// This is the last of a multi-part message.
69	_INFO3_LAST int = (1 << 0)
70	// Commit to master only before declaring success.
71	_INFO3_COMMIT_MASTER int = (1 << 1)
72	// Update only. Merge bins.
73	_INFO3_UPDATE_ONLY int = (1 << 3)
74
75	// Create or completely replace record.
76	_INFO3_CREATE_OR_REPLACE int = (1 << 4)
77	// Completely replace existing record only.
78	_INFO3_REPLACE_ONLY int = (1 << 5)
79	// See Below
80	_INFO3_SC_READ_TYPE int = (1 << 6)
81	// See Below
82	_INFO3_SC_READ_RELAX int = (1 << 7)
83
84	// Interpret SC_READ bits in info3.
85	//
86	// RELAX   TYPE
87	//	                strict
88	//	                ------
89	//   0      0     sequential (default)
90	//   0      1     linearize
91	//
92	//	                relaxed
93	//	                -------
94	//   1      0     allow replica
95	//   1      1     allow unavailable
96
97	_MSG_TOTAL_HEADER_SIZE     uint8 = 30
98	_FIELD_HEADER_SIZE         uint8 = 5
99	_OPERATION_HEADER_SIZE     uint8 = 8
100	_MSG_REMAINING_HEADER_SIZE uint8 = 22
101	_DIGEST_SIZE               uint8 = 20
102	_COMPRESS_THRESHOLD        int   = 128
103	_CL_MSG_VERSION            int64 = 2
104	_AS_MSG_TYPE               int64 = 3
105	_AS_MSG_TYPE_COMPRESSED    int64 = 4
106)
107
108// command interface describes all commands available
109type command interface {
110	getPolicy(ifc command) Policy
111
112	writeBuffer(ifc command) error
113	getNode(ifc command) (*Node, error)
114	getConnection(policy Policy) (*Connection, error)
115	putConnection(conn *Connection)
116	parseResult(ifc command, conn *Connection) error
117	parseRecordResults(ifc command, receiveSize int) (bool, error)
118	prepareRetry(ifc command, isTimeout bool) bool
119
120	execute(ifc command, isRead bool) error
121	executeAt(ifc command, policy *BasePolicy, isRead bool, deadline time.Time, iterations, commandSentCounter int) error
122
123	// Executes the command
124	Execute() error
125}
126
127// Holds data buffer for the command
128type baseCommand struct {
129	node *Node
130	conn *Connection
131
132	// dataBufferCompress is not a second buffer; it is just a pointer to
133	// the beginning of the dataBuffer.
134	// To avoid allocating multiple buffers before compression, the dataBuffer
135	// will be referencing to a padded buffer. After the command is written to
136	// the buffer, this padding will be used to compress the command in-place,
137	// and then the compressed proto header will be written.
138	dataBufferCompress []byte
139	dataBuffer         []byte
140	dataOffset         int
141
142	// oneShot determines if streaming commands like query, scan or queryAggregate
143	// are not retried if they error out mid-parsing
144	oneShot bool
145
146	// will determine if the buffer will be compressed
147	// before being sent to the server
148	compressed bool
149}
150
151// Writes the command for write operations
152func (cmd *baseCommand) setWrite(policy *WritePolicy, operation OperationType, key *Key, bins []*Bin, binMap BinMap) error {
153	cmd.begin()
154	fieldCount, err := cmd.estimateKeySize(key, policy.SendKey)
155	if err != nil {
156		return err
157	}
158
159	predSize := 0
160	if len(policy.PredExp) > 0 {
161		predSize = cmd.estimatePredExpSize(policy.PredExp)
162		fieldCount++
163	}
164
165	if binMap == nil {
166		for i := range bins {
167			if err := cmd.estimateOperationSizeForBin(bins[i]); err != nil {
168				return err
169			}
170		}
171	} else {
172		for name, value := range binMap {
173			if err := cmd.estimateOperationSizeForBinNameAndValue(name, value); err != nil {
174				return err
175			}
176		}
177	}
178
179	if err := cmd.sizeBuffer(policy.compress()); err != nil {
180		return err
181	}
182
183	if binMap == nil {
184		cmd.writeHeaderWithPolicy(policy, 0, _INFO2_WRITE, fieldCount, len(bins))
185	} else {
186		cmd.writeHeaderWithPolicy(policy, 0, _INFO2_WRITE, fieldCount, len(binMap))
187	}
188
189	cmd.writeKey(key, policy.SendKey)
190
191	if len(policy.PredExp) > 0 {
192		if err := cmd.writePredExp(policy.PredExp, predSize); err != nil {
193			return err
194		}
195	}
196
197	if binMap == nil {
198		for i := range bins {
199			if err := cmd.writeOperationForBin(bins[i], operation); err != nil {
200				return err
201			}
202		}
203	} else {
204		for name, value := range binMap {
205			if err := cmd.writeOperationForBinNameAndValue(name, value, operation); err != nil {
206				return err
207			}
208		}
209	}
210
211	cmd.end()
212	cmd.markCompressed(policy)
213
214	return nil
215}
216
217// Writes the command for delete operations
218func (cmd *baseCommand) setDelete(policy *WritePolicy, key *Key) error {
219	cmd.begin()
220	fieldCount, err := cmd.estimateKeySize(key, false)
221	if err != nil {
222		return err
223	}
224
225	predSize := 0
226	if len(policy.PredExp) > 0 {
227		predSize = cmd.estimatePredExpSize(policy.PredExp)
228		fieldCount++
229	}
230
231	if err := cmd.sizeBuffer(policy.compress()); err != nil {
232		return err
233	}
234	cmd.writeHeaderWithPolicy(policy, 0, _INFO2_WRITE|_INFO2_DELETE, fieldCount, 0)
235	cmd.writeKey(key, false)
236	if len(policy.PredExp) > 0 {
237		if err := cmd.writePredExp(policy.PredExp, predSize); err != nil {
238			return err
239		}
240	}
241	cmd.end()
242	cmd.markCompressed(policy)
243
244	return nil
245
246}
247
248// Writes the command for touch operations
249func (cmd *baseCommand) setTouch(policy *WritePolicy, key *Key) error {
250	cmd.begin()
251	fieldCount, err := cmd.estimateKeySize(key, policy.SendKey)
252	if err != nil {
253		return err
254	}
255
256	predSize := 0
257	if len(policy.PredExp) > 0 {
258		predSize = cmd.estimatePredExpSize(policy.PredExp)
259		fieldCount++
260	}
261
262	cmd.estimateOperationSize()
263	if err := cmd.sizeBuffer(false); err != nil {
264		return err
265	}
266	cmd.writeHeaderWithPolicy(policy, 0, _INFO2_WRITE, fieldCount, 1)
267	cmd.writeKey(key, policy.SendKey)
268	if len(policy.PredExp) > 0 {
269		if err := cmd.writePredExp(policy.PredExp, predSize); err != nil {
270			return err
271		}
272	}
273	cmd.writeOperationForOperationType(_TOUCH)
274	cmd.end()
275	return nil
276
277}
278
279// Writes the command for exist operations
280func (cmd *baseCommand) setExists(policy *BasePolicy, key *Key) error {
281	cmd.begin()
282	fieldCount, err := cmd.estimateKeySize(key, false)
283	if err != nil {
284		return err
285	}
286
287	predSize := 0
288	if len(policy.PredExp) > 0 {
289		predSize = cmd.estimatePredExpSize(policy.PredExp)
290		fieldCount++
291	}
292
293	if err := cmd.sizeBuffer(false); err != nil {
294		return err
295	}
296	cmd.writeHeader(policy, _INFO1_READ|_INFO1_NOBINDATA, 0, fieldCount, 0)
297	cmd.writeKey(key, false)
298	if len(policy.PredExp) > 0 {
299		if err := cmd.writePredExp(policy.PredExp, predSize); err != nil {
300			return err
301		}
302	}
303	cmd.end()
304	return nil
305
306}
307
308// Writes the command for get operations (all bins)
309func (cmd *baseCommand) setReadForKeyOnly(policy *BasePolicy, key *Key) error {
310	cmd.begin()
311	fieldCount, err := cmd.estimateKeySize(key, false)
312	if err != nil {
313		return err
314	}
315	predSize := 0
316	if len(policy.PredExp) > 0 {
317		predSize = cmd.estimatePredExpSize(policy.PredExp)
318		fieldCount++
319	}
320	if err := cmd.sizeBuffer(false); err != nil {
321		return err
322	}
323	cmd.writeHeader(policy, _INFO1_READ|_INFO1_GET_ALL, 0, fieldCount, 0)
324	cmd.writeKey(key, false)
325	if len(policy.PredExp) > 0 {
326		if err := cmd.writePredExp(policy.PredExp, predSize); err != nil {
327			return err
328		}
329	}
330	cmd.end()
331	return nil
332
333}
334
335// Writes the command for get operations (specified bins)
336func (cmd *baseCommand) setRead(policy *BasePolicy, key *Key, binNames []string) (err error) {
337	if len(binNames) > 0 {
338		cmd.begin()
339		fieldCount, err := cmd.estimateKeySize(key, false)
340		if err != nil {
341			return err
342		}
343
344		predSize := 0
345		if len(policy.PredExp) > 0 {
346			predSize = cmd.estimatePredExpSize(policy.PredExp)
347			fieldCount++
348		}
349
350		for i := range binNames {
351			cmd.estimateOperationSizeForBinName(binNames[i])
352		}
353		if err = cmd.sizeBuffer(false); err != nil {
354			return nil
355		}
356		cmd.writeHeader(policy, _INFO1_READ, 0, fieldCount, len(binNames))
357		cmd.writeKey(key, false)
358
359		if len(policy.PredExp) > 0 {
360			cmd.writePredExp(policy.PredExp, predSize)
361		}
362
363		for i := range binNames {
364			cmd.writeOperationForBinName(binNames[i], _READ)
365		}
366		cmd.end()
367	} else {
368		err = cmd.setReadForKeyOnly(policy, key)
369	}
370
371	return err
372}
373
374// Writes the command for getting metadata operations
375func (cmd *baseCommand) setReadHeader(policy *BasePolicy, key *Key) error {
376	cmd.begin()
377	fieldCount, err := cmd.estimateKeySize(key, false)
378	if err != nil {
379		return err
380	}
381
382	predSize := 0
383	if len(policy.PredExp) > 0 {
384		predSize = cmd.estimatePredExpSize(policy.PredExp)
385		fieldCount++
386	}
387
388	cmd.estimateOperationSizeForBinName("")
389	if err := cmd.sizeBuffer(policy.compress()); err != nil {
390		return err
391	}
392
393	cmd.writeHeader(policy, _INFO1_READ|_INFO1_NOBINDATA, 0, fieldCount, 1)
394
395	cmd.writeKey(key, false)
396	if len(policy.PredExp) > 0 {
397		if err := cmd.writePredExp(policy.PredExp, predSize); err != nil {
398			return err
399		}
400	}
401	cmd.writeOperationForBinName("", _READ)
402	cmd.end()
403	return nil
404
405}
406
407// Implements different command operations
408func (cmd *baseCommand) setOperate(policy *WritePolicy, key *Key, operations []*Operation) (bool, error) {
409	if len(operations) == 0 {
410		return false, NewAerospikeError(PARAMETER_ERROR, "No operations were passed.")
411	}
412
413	cmd.begin()
414	fieldCount := 0
415	readAttr := 0
416	writeAttr := 0
417	hasWrite := false
418	readBin := false
419	readHeader := false
420	RespondPerEachOp := policy.RespondPerEachOp
421
422	for i := range operations {
423		switch operations[i].opType {
424		case _BIT_READ:
425			fallthrough
426		case _HLL_READ:
427			fallthrough
428		case _MAP_READ:
429			// Map operations require RespondPerEachOp to be true.
430			RespondPerEachOp = true
431			// Fall through to read.
432			fallthrough
433		case _READ, _CDT_READ:
434			if !operations[i].headerOnly {
435				readAttr |= _INFO1_READ
436
437				// Read all bins if no bin is specified.
438				if operations[i].binName == "" {
439					readAttr |= _INFO1_GET_ALL
440				}
441				readBin = true
442			} else {
443				readAttr |= _INFO1_READ
444				readHeader = true
445			}
446		case _BIT_MODIFY:
447			fallthrough
448		case _HLL_MODIFY:
449			fallthrough
450		case _MAP_MODIFY:
451			// Map operations require RespondPerEachOp to be true.
452			RespondPerEachOp = true
453			// Fall through to default.
454			fallthrough
455		default:
456			writeAttr = _INFO2_WRITE
457			hasWrite = true
458		}
459		cmd.estimateOperationSizeForOperation(operations[i])
460	}
461
462	ksz, err := cmd.estimateKeySize(key, policy.SendKey && hasWrite)
463	if err != nil {
464		return hasWrite, err
465	}
466	fieldCount += ksz
467
468	predSize := 0
469	if len(policy.PredExp) > 0 {
470		predSize = cmd.estimatePredExpSize(policy.PredExp)
471		fieldCount++
472	}
473
474	if err := cmd.sizeBuffer(policy.compress()); err != nil {
475		return hasWrite, err
476	}
477
478	if readHeader && !readBin {
479		readAttr |= _INFO1_NOBINDATA
480	}
481
482	if RespondPerEachOp {
483		writeAttr |= _INFO2_RESPOND_ALL_OPS
484	}
485
486	if writeAttr != 0 {
487		cmd.writeHeaderWithPolicy(policy, readAttr, writeAttr, fieldCount, len(operations))
488	} else {
489		cmd.writeHeader(&policy.BasePolicy, readAttr, writeAttr, fieldCount, len(operations))
490	}
491	cmd.writeKey(key, policy.SendKey && hasWrite)
492
493	if len(policy.PredExp) > 0 {
494		if err := cmd.writePredExp(policy.PredExp, predSize); err != nil {
495			return hasWrite, err
496		}
497	}
498
499	for _, operation := range operations {
500		if err := cmd.writeOperationForOperation(operation); err != nil {
501			return hasWrite, err
502		}
503	}
504
505	cmd.end()
506	cmd.markCompressed(policy)
507
508	return hasWrite, nil
509}
510
511func (cmd *baseCommand) setUdf(policy *WritePolicy, key *Key, packageName string, functionName string, args *ValueArray) error {
512	cmd.begin()
513	fieldCount, err := cmd.estimateKeySize(key, policy.SendKey)
514	if err != nil {
515		return err
516	}
517
518	predSize := 0
519	if len(policy.PredExp) > 0 {
520		predSize = cmd.estimatePredExpSize(policy.PredExp)
521		fieldCount++
522	}
523
524	fc, err := cmd.estimateUdfSize(packageName, functionName, args)
525	if err != nil {
526		return err
527	}
528	fieldCount += fc
529
530	if err := cmd.sizeBuffer(policy.compress()); err != nil {
531		return err
532	}
533
534	cmd.writeHeaderWithPolicy(policy, 0, _INFO2_WRITE, fieldCount, 0)
535	cmd.writeKey(key, policy.SendKey)
536	if len(policy.PredExp) > 0 {
537		if err := cmd.writePredExp(policy.PredExp, predSize); err != nil {
538			return err
539		}
540	}
541	cmd.writeFieldString(packageName, UDF_PACKAGE_NAME)
542	cmd.writeFieldString(functionName, UDF_FUNCTION)
543	cmd.writeUdfArgs(args)
544	cmd.end()
545	cmd.markCompressed(policy)
546
547	return nil
548}
549
550func (cmd *baseCommand) setBatchIndexReadCompat(policy *BatchPolicy, keys []*Key, batch *batchNode, binNames []string, readAttr int) error {
551	offsets := batch.offsets
552	max := len(batch.offsets)
553	fieldCountRow := 1
554	if policy.SendSetName {
555		fieldCountRow = 2
556	}
557
558	binNameSize := 0
559	operationCount := len(binNames)
560	for _, binName := range binNames {
561		binNameSize += len(binName) + int(_OPERATION_HEADER_SIZE)
562	}
563
564	// Estimate buffer size
565	cmd.begin()
566	fieldCount := 1
567	predSize := 0
568	if len(policy.PredExp) > 0 {
569		predSize = cmd.estimatePredExpSize(policy.PredExp)
570		fieldCount++
571	}
572
573	cmd.dataOffset += int(_FIELD_HEADER_SIZE) + 5
574
575	var prev *Key
576	for i := 0; i < max; i++ {
577		key := keys[offsets[i]]
578		cmd.dataOffset += len(key.digest) + 4
579
580		// Try reference equality in hope that namespace/set for all keys is set from fixed variables.
581		if prev != nil && prev.namespace == key.namespace &&
582			(!policy.SendSetName || prev.setName == key.setName) {
583			// Can set repeat previous namespace/bin names to save space.
584			cmd.dataOffset++
585		} else {
586			// Must write full header and namespace/set/bin names.
587			cmd.dataOffset += len(key.namespace) + int(_FIELD_HEADER_SIZE) + 6
588
589			if policy.SendSetName {
590				cmd.dataOffset += len(key.setName) + int(_FIELD_HEADER_SIZE)
591			}
592			cmd.dataOffset += binNameSize
593			prev = key
594		}
595	}
596
597	if err := cmd.sizeBuffer(policy.compress()); err != nil {
598		return err
599	}
600
601	if policy.ReadModeAP == ReadModeAPAll {
602		readAttr |= _INFO1_READ_MODE_AP_ALL
603	}
604
605	if len(binNames) == 0 {
606		readAttr |= _INFO1_GET_ALL
607	}
608
609	cmd.writeHeader(&policy.BasePolicy, readAttr|_INFO1_BATCH, 0, fieldCount, 0)
610
611	if len(policy.PredExp) > 0 {
612		if err := cmd.writePredExp(policy.PredExp, predSize); err != nil {
613			return err
614		}
615	}
616
617	// Write real field size.
618	fieldSizeOffset := cmd.dataOffset
619	if policy.SendSetName {
620		cmd.writeFieldHeader(0, BATCH_INDEX_WITH_SET)
621	} else {
622		cmd.writeFieldHeader(0, BATCH_INDEX)
623	}
624
625	cmd.WriteUint32(uint32(max))
626
627	if policy.AllowInline {
628		cmd.WriteByte(1)
629	} else {
630		cmd.WriteByte(0)
631	}
632
633	prev = nil
634	for i := 0; i < max; i++ {
635		index := offsets[i]
636		cmd.WriteUint32(uint32(index))
637
638		key := keys[index]
639		cmd.Write(key.digest[:])
640		// Try reference equality in hope that namespace/set for all keys is set from fixed variables.
641		if prev != nil && prev.namespace == key.namespace &&
642			(!policy.SendSetName || prev.setName == key.setName) {
643			// Can set repeat previous namespace/bin names to save space.
644			cmd.WriteByte(1) // repeat
645		} else {
646			// Write full header, namespace and bin names.
647			cmd.WriteByte(0) // do not repeat
648			cmd.WriteByte(byte(readAttr))
649			cmd.WriteUint16(uint16(fieldCountRow))
650			cmd.WriteUint16(uint16(operationCount))
651			cmd.writeFieldString(key.namespace, NAMESPACE)
652
653			if policy.SendSetName {
654				cmd.writeFieldString(key.setName, TABLE)
655			}
656
657			for _, binName := range binNames {
658				cmd.writeOperationForBinName(binName, _READ)
659			}
660
661			prev = key
662		}
663	}
664
665	cmd.WriteUint32At(uint32(cmd.dataOffset)-uint32(_MSG_TOTAL_HEADER_SIZE)-4, fieldSizeOffset)
666	cmd.end()
667	cmd.markCompressed(policy)
668
669	return nil
670}
671
672func (cmd *baseCommand) setBatchIndexRead(policy *BatchPolicy, records []*BatchRead, batch *batchNode) error {
673	offsets := batch.offsets
674	max := len(batch.offsets)
675	fieldCountRow := 1
676	if policy.SendSetName {
677		fieldCountRow = 2
678	}
679
680	// Estimate buffer size
681	cmd.begin()
682	fieldCount := 1
683	predSize := 0
684	if len(policy.PredExp) > 0 {
685		predSize = cmd.estimatePredExpSize(policy.PredExp)
686		fieldCount++
687	}
688
689	cmd.dataOffset += int(_FIELD_HEADER_SIZE) + 5
690
691	var prev *BatchRead
692	for i := 0; i < max; i++ {
693		record := records[offsets[i]]
694		key := record.Key
695		binNames := record.BinNames
696
697		cmd.dataOffset += len(key.digest) + 4
698
699		// Try reference equality in hope that namespace/set for all keys is set from fixed variables.
700		if prev != nil && prev.Key.namespace == key.namespace &&
701			(!policy.SendSetName || prev.Key.setName == key.setName) &&
702			&prev.BinNames == &binNames && prev.ReadAllBins == record.ReadAllBins {
703			// Can set repeat previous namespace/bin names to save space.
704			cmd.dataOffset++
705		} else {
706			// Must write full header and namespace/set/bin names.
707			cmd.dataOffset += len(key.namespace) + int(_FIELD_HEADER_SIZE) + 6
708
709			if policy.SendSetName {
710				cmd.dataOffset += len(key.setName) + int(_FIELD_HEADER_SIZE)
711			}
712
713			if len(binNames) != 0 {
714				for _, binName := range binNames {
715					cmd.estimateOperationSizeForBinName(binName)
716				}
717			}
718
719			prev = record
720		}
721	}
722
723	if err := cmd.sizeBuffer(policy.compress()); err != nil {
724		return err
725	}
726
727	readAttr := _INFO1_READ
728	if policy.ReadModeAP == ReadModeAPAll {
729		readAttr |= _INFO1_READ_MODE_AP_ALL
730	}
731
732	cmd.writeHeader(&policy.BasePolicy, readAttr|_INFO1_BATCH, 0, fieldCount, 0)
733	// cmd.writeHeader(&policy.BasePolicy, _INFO1_READ|_INFO1_BATCH, 0, 1, 0)
734
735	if len(policy.PredExp) > 0 {
736		if err := cmd.writePredExp(policy.PredExp, predSize); err != nil {
737			return err
738		}
739	}
740
741	// Write real field size.
742	fieldSizeOffset := cmd.dataOffset
743	if policy.SendSetName {
744		cmd.writeFieldHeader(0, BATCH_INDEX_WITH_SET)
745	} else {
746		cmd.writeFieldHeader(0, BATCH_INDEX)
747	}
748
749	cmd.WriteUint32(uint32(max))
750
751	if policy.AllowInline {
752		cmd.WriteByte(1)
753	} else {
754		cmd.WriteByte(0)
755	}
756
757	prev = nil
758	for i := 0; i < max; i++ {
759		index := offsets[i]
760		cmd.WriteUint32(uint32(index))
761
762		record := records[index]
763		key := record.Key
764		binNames := record.BinNames
765		cmd.Write(key.digest[:])
766
767		// Try reference equality in hope that namespace/set for all keys is set from fixed variables.
768		if prev != nil && prev.Key.namespace == key.namespace &&
769			(!policy.SendSetName || prev.Key.setName == key.setName) &&
770			&prev.BinNames == &binNames && prev.ReadAllBins == record.ReadAllBins {
771			// Can set repeat previous namespace/bin names to save space.
772			cmd.WriteByte(1) // repeat
773		} else {
774			// Write full header, namespace and bin names.
775			cmd.WriteByte(0) // do not repeat
776			if len(binNames) > 0 {
777				cmd.WriteByte(byte(readAttr))
778				cmd.WriteUint16(uint16(fieldCountRow))
779				cmd.WriteUint16(uint16(len(binNames)))
780				cmd.writeFieldString(key.namespace, NAMESPACE)
781
782				if policy.SendSetName {
783					cmd.writeFieldString(key.setName, TABLE)
784				}
785
786				for _, binName := range binNames {
787					cmd.writeOperationForBinName(binName, _READ)
788				}
789			} else {
790				attr := byte(readAttr)
791				if record.ReadAllBins {
792					attr |= byte(_INFO1_GET_ALL)
793				} else {
794					attr |= byte(_INFO1_NOBINDATA)
795				}
796				cmd.WriteByte(attr)
797
798				cmd.WriteUint16(uint16(fieldCountRow))
799				cmd.WriteUint16(0)
800				cmd.writeFieldString(key.namespace, NAMESPACE)
801
802				if policy.SendSetName {
803					cmd.writeFieldString(key.setName, TABLE)
804				}
805			}
806
807			prev = record
808		}
809	}
810
811	cmd.WriteUint32At(uint32(cmd.dataOffset)-uint32(_MSG_TOTAL_HEADER_SIZE)-4, fieldSizeOffset)
812	cmd.end()
813	cmd.markCompressed(policy)
814
815	return nil
816}
817
818func (cmd *baseCommand) setScan(policy *ScanPolicy, namespace *string, setName *string, binNames []string, taskID uint64) error {
819	cmd.begin()
820	fieldCount := 0
821
822	predSize := 0
823	if len(policy.PredExp) > 0 {
824		predSize = cmd.estimatePredExpSize(policy.PredExp)
825		fieldCount++
826	}
827
828	if namespace != nil {
829		cmd.dataOffset += len(*namespace) + int(_FIELD_HEADER_SIZE)
830		fieldCount++
831	}
832
833	if setName != nil {
834		cmd.dataOffset += len(*setName) + int(_FIELD_HEADER_SIZE)
835		fieldCount++
836	}
837
838	if policy.RecordsPerSecond > 0 {
839		cmd.dataOffset += 4 + int(_FIELD_HEADER_SIZE)
840		fieldCount++
841	}
842
843	// Estimate scan options size.
844	cmd.dataOffset += 2 + int(_FIELD_HEADER_SIZE)
845	fieldCount++
846
847	// Estimate scan timeout size.
848	cmd.dataOffset += 4 + int(_FIELD_HEADER_SIZE)
849	fieldCount++
850
851	// Allocate space for TaskId field.
852	cmd.dataOffset += 8 + int(_FIELD_HEADER_SIZE)
853	fieldCount++
854
855	if binNames != nil {
856		for i := range binNames {
857			cmd.estimateOperationSizeForBinName(binNames[i])
858		}
859	}
860
861	if err := cmd.sizeBuffer(false); err != nil {
862		return err
863	}
864	readAttr := _INFO1_READ
865
866	if !policy.IncludeBinData {
867		readAttr |= _INFO1_NOBINDATA
868	}
869
870	operationCount := 0
871	if binNames != nil {
872		operationCount = len(binNames)
873	}
874	cmd.writeHeader(&policy.BasePolicy, readAttr, 0, fieldCount, operationCount)
875
876	if namespace != nil {
877		cmd.writeFieldString(*namespace, NAMESPACE)
878	}
879
880	if setName != nil {
881		cmd.writeFieldString(*setName, TABLE)
882	}
883
884	if len(policy.PredExp) > 0 {
885		if err := cmd.writePredExp(policy.PredExp, predSize); err != nil {
886			return err
887		}
888	}
889
890	if policy.RecordsPerSecond > 0 {
891		cmd.writeFieldInt32(int32(policy.RecordsPerSecond), RECORDS_PER_SECOND)
892	}
893
894	cmd.writeFieldHeader(2, SCAN_OPTIONS)
895	priority := byte(policy.Priority)
896	priority <<= 4
897
898	if policy.FailOnClusterChange {
899		priority |= 0x08
900	}
901
902	cmd.WriteByte(priority)
903	cmd.WriteByte(byte(policy.ScanPercent))
904
905	// Write scan timeout
906	cmd.writeFieldHeader(4, SCAN_TIMEOUT)
907	cmd.WriteInt32(int32(policy.SocketTimeout / time.Millisecond)) // in milliseconds
908
909	cmd.writeFieldHeader(8, TRAN_ID)
910	cmd.WriteUint64(taskID)
911
912	if binNames != nil {
913		for i := range binNames {
914			cmd.writeOperationForBinName(binNames[i], _READ)
915		}
916	}
917
918	cmd.end()
919
920	return nil
921}
922
923func (cmd *baseCommand) setQuery(policy *QueryPolicy, wpolicy *WritePolicy, statement *Statement, operations []*Operation, write bool) (err error) {
924	fieldCount := 0
925	filterSize := 0
926	binNameSize := 0
927	predSize := 0
928	predExp := statement.predExps
929
930	recordsPerSecond := 0
931	if !write {
932		recordsPerSecond = policy.RecordsPerSecond
933	}
934
935	cmd.begin()
936
937	if statement.Namespace != "" {
938		cmd.dataOffset += len(statement.Namespace) + int(_FIELD_HEADER_SIZE)
939		fieldCount++
940	}
941
942	if statement.IndexName != "" {
943		cmd.dataOffset += len(statement.IndexName) + int(_FIELD_HEADER_SIZE)
944		fieldCount++
945	}
946
947	if statement.SetName != "" {
948		cmd.dataOffset += len(statement.SetName) + int(_FIELD_HEADER_SIZE)
949		fieldCount++
950	}
951
952	// Allocate space for TaskId field.
953	cmd.dataOffset += 8 + int(_FIELD_HEADER_SIZE)
954	fieldCount++
955
956	if statement.Filter != nil {
957		idxType := statement.Filter.IndexCollectionType()
958
959		if idxType != ICT_DEFAULT {
960			cmd.dataOffset += int(_FIELD_HEADER_SIZE) + 1
961			fieldCount++
962		}
963
964		cmd.dataOffset += int(_FIELD_HEADER_SIZE)
965		filterSize++ // num filters
966
967		sz, err := statement.Filter.EstimateSize()
968		if err != nil {
969			return err
970		}
971		filterSize += sz
972
973		cmd.dataOffset += filterSize
974		fieldCount++
975
976		// Query bin names are specified as a field (Scan bin names are specified later as operations)
977		if len(statement.BinNames) > 0 {
978			cmd.dataOffset += int(_FIELD_HEADER_SIZE)
979			binNameSize++ // num bin names
980
981			for _, binName := range statement.BinNames {
982				binNameSize += len(binName) + 1
983			}
984			cmd.dataOffset += binNameSize
985			fieldCount++
986		}
987	} else {
988		// Calling query with no filters is more efficiently handled by a primary index scan.
989		// Estimate scan options size.
990		cmd.dataOffset += (2 + int(_FIELD_HEADER_SIZE))
991		fieldCount++
992
993		// Estimate scan timeout size.
994		cmd.dataOffset += (4 + int(_FIELD_HEADER_SIZE))
995		fieldCount++
996
997		// Estimate records per second size.
998		if recordsPerSecond > 0 {
999			cmd.dataOffset += 4 + int(_FIELD_HEADER_SIZE)
1000			fieldCount++
1001		}
1002	}
1003
1004	if len(policy.PredExp) > 0 && len(predExp) == 0 {
1005		predExp = policy.PredExp
1006	}
1007
1008	if len(predExp) > 0 {
1009		predSize = cmd.estimatePredExpSize(predExp)
1010		fieldCount++
1011	}
1012
1013	var functionArgs *ValueArray
1014	if statement.functionName != "" {
1015		cmd.dataOffset += int(_FIELD_HEADER_SIZE) + 1 // udf type
1016		cmd.dataOffset += len(statement.packageName) + int(_FIELD_HEADER_SIZE)
1017		cmd.dataOffset += len(statement.functionName) + int(_FIELD_HEADER_SIZE)
1018
1019		fasz := 0
1020		if len(statement.functionArgs) > 0 {
1021			functionArgs = NewValueArray(statement.functionArgs)
1022			fasz, err = functionArgs.EstimateSize()
1023			if err != nil {
1024				return err
1025			}
1026		}
1027
1028		cmd.dataOffset += int(_FIELD_HEADER_SIZE) + fasz
1029		fieldCount += 4
1030	}
1031
1032	// Operations (used in query execute) and bin names (used in scan/query) are mutually exclusive.
1033	if len(operations) > 0 {
1034		for _, op := range operations {
1035			cmd.estimateOperationSizeForOperation(op)
1036		}
1037	} else if len(statement.BinNames) > 0 && statement.Filter == nil {
1038		for _, binName := range statement.BinNames {
1039			cmd.estimateOperationSizeForBinName(binName)
1040		}
1041	}
1042
1043	if err := cmd.sizeBuffer(false); err != nil {
1044		return err
1045	}
1046
1047	operationCount := 0
1048	if len(operations) > 0 {
1049		operationCount = len(operations)
1050	} else if statement.Filter == nil && len(statement.BinNames) > 0 {
1051		operationCount = len(statement.BinNames)
1052	}
1053
1054	if write {
1055		cmd.writeHeaderWithPolicy(wpolicy, 0, _INFO2_WRITE, fieldCount, operationCount)
1056	} else {
1057		readAttr := _INFO1_READ | _INFO1_NOBINDATA
1058		if policy.IncludeBinData {
1059			readAttr = _INFO1_READ
1060		}
1061		cmd.writeHeader(&policy.BasePolicy, readAttr, 0, fieldCount, operationCount)
1062	}
1063
1064	if statement.Namespace != "" {
1065		cmd.writeFieldString(statement.Namespace, NAMESPACE)
1066	}
1067
1068	if statement.IndexName != "" {
1069		cmd.writeFieldString(statement.IndexName, INDEX_NAME)
1070	}
1071
1072	if statement.SetName != "" {
1073		cmd.writeFieldString(statement.SetName, TABLE)
1074	}
1075
1076	cmd.writeFieldHeader(8, TRAN_ID)
1077	cmd.WriteUint64(statement.TaskId)
1078
1079	if statement.Filter != nil {
1080		idxType := statement.Filter.IndexCollectionType()
1081
1082		if idxType != ICT_DEFAULT {
1083			cmd.writeFieldHeader(1, INDEX_TYPE)
1084			cmd.WriteByte(byte(idxType))
1085		}
1086
1087		cmd.writeFieldHeader(filterSize, INDEX_RANGE)
1088		cmd.WriteByte(byte(1)) // number of filters
1089
1090		_, err := statement.Filter.write(cmd)
1091		if err != nil {
1092			return err
1093		}
1094
1095		if len(statement.BinNames) > 0 {
1096			cmd.writeFieldHeader(binNameSize, QUERY_BINLIST)
1097			cmd.WriteByte(byte(len(statement.BinNames)))
1098
1099			for _, binName := range statement.BinNames {
1100				len := copy(cmd.dataBuffer[cmd.dataOffset+1:], binName)
1101				cmd.dataBuffer[cmd.dataOffset] = byte(len)
1102				cmd.dataOffset += len + 1
1103			}
1104		}
1105	} else {
1106		// Calling query with no filters is more efficiently handled by a primary index scan.
1107		cmd.writeFieldHeader(2, SCAN_OPTIONS)
1108		priority := byte(policy.Priority)
1109		priority <<= 4
1110
1111		if !write && policy.FailOnClusterChange {
1112			priority |= 0x08
1113		}
1114
1115		cmd.WriteByte(priority)
1116		cmd.WriteByte(byte(100))
1117
1118		// Write scan timeout
1119		cmd.writeFieldHeader(4, SCAN_TIMEOUT)
1120		cmd.WriteInt32(int32(policy.SocketTimeout / time.Millisecond)) // in milliseconds
1121
1122		// Write records per second.
1123		if recordsPerSecond > 0 {
1124			cmd.writeFieldInt32(int32(recordsPerSecond), RECORDS_PER_SECOND)
1125		}
1126	}
1127
1128	if len(predExp) > 0 {
1129		if err := cmd.writePredExp(predExp, predSize); err != nil {
1130			return err
1131		}
1132	}
1133
1134	if statement.functionName != "" {
1135		cmd.writeFieldHeader(1, UDF_OP)
1136		if statement.returnData {
1137			cmd.dataBuffer[cmd.dataOffset] = byte(1)
1138		} else {
1139			cmd.dataBuffer[cmd.dataOffset] = byte(2)
1140		}
1141		cmd.dataOffset++
1142
1143		cmd.writeFieldString(statement.packageName, UDF_PACKAGE_NAME)
1144		cmd.writeFieldString(statement.functionName, UDF_FUNCTION)
1145		cmd.writeUdfArgs(functionArgs)
1146	}
1147
1148	if len(operations) > 0 {
1149		for _, op := range operations {
1150			cmd.writeOperationForOperation(op)
1151		}
1152	} else if len(statement.BinNames) > 0 && statement.Filter == nil {
1153		// scan binNames come last
1154		for _, binName := range statement.BinNames {
1155			cmd.writeOperationForBinName(binName, _READ)
1156		}
1157	}
1158
1159	cmd.end()
1160
1161	return nil
1162}
1163
1164func (cmd *baseCommand) estimateKeySize(key *Key, sendKey bool) (int, error) {
1165	fieldCount := 0
1166
1167	if key.namespace != "" {
1168		cmd.dataOffset += len(key.namespace) + int(_FIELD_HEADER_SIZE)
1169		fieldCount++
1170	}
1171
1172	if key.setName != "" {
1173		cmd.dataOffset += len(key.setName) + int(_FIELD_HEADER_SIZE)
1174		fieldCount++
1175	}
1176
1177	cmd.dataOffset += int(_DIGEST_SIZE + _FIELD_HEADER_SIZE)
1178	fieldCount++
1179
1180	if sendKey {
1181		// field header size + key size
1182		sz, err := key.userKey.EstimateSize()
1183		if err != nil {
1184			return sz, err
1185		}
1186		cmd.dataOffset += sz + int(_FIELD_HEADER_SIZE) + 1
1187		fieldCount++
1188	}
1189
1190	return fieldCount, nil
1191}
1192
1193func (cmd *baseCommand) estimateUdfSize(packageName string, functionName string, args *ValueArray) (int, error) {
1194	cmd.dataOffset += len(packageName) + int(_FIELD_HEADER_SIZE)
1195	cmd.dataOffset += len(functionName) + int(_FIELD_HEADER_SIZE)
1196
1197	sz, err := args.EstimateSize()
1198	if err != nil {
1199		return 0, err
1200	}
1201
1202	// fmt.Println(args, sz)
1203
1204	cmd.dataOffset += sz + int(_FIELD_HEADER_SIZE)
1205	return 3, nil
1206}
1207
1208func (cmd *baseCommand) estimateOperationSizeForBin(bin *Bin) error {
1209	cmd.dataOffset += len(bin.Name) + int(_OPERATION_HEADER_SIZE)
1210	sz, err := bin.Value.EstimateSize()
1211	if err != nil {
1212		return err
1213	}
1214	cmd.dataOffset += sz
1215	return nil
1216}
1217
1218func (cmd *baseCommand) estimateOperationSizeForBinNameAndValue(name string, value interface{}) error {
1219	cmd.dataOffset += len(name) + int(_OPERATION_HEADER_SIZE)
1220	sz, err := NewValue(value).EstimateSize()
1221	if err != nil {
1222		return err
1223	}
1224	cmd.dataOffset += sz
1225	return nil
1226}
1227
1228func (cmd *baseCommand) estimateOperationSizeForOperation(operation *Operation) error {
1229	binLen := len(operation.binName)
1230	cmd.dataOffset += binLen + int(_OPERATION_HEADER_SIZE)
1231
1232	if operation.encoder == nil {
1233		if operation.binValue != nil {
1234			sz, err := operation.binValue.EstimateSize()
1235			if err != nil {
1236				return err
1237			}
1238			cmd.dataOffset += sz
1239		}
1240	} else {
1241		sz, err := operation.encoder(operation, nil)
1242		if err != nil {
1243			return err
1244		}
1245		cmd.dataOffset += sz
1246	}
1247	return nil
1248}
1249
1250func (cmd *baseCommand) estimateOperationSizeForBinName(binName string) {
1251	cmd.dataOffset += len(binName) + int(_OPERATION_HEADER_SIZE)
1252}
1253
1254func (cmd *baseCommand) estimateOperationSize() {
1255	cmd.dataOffset += int(_OPERATION_HEADER_SIZE)
1256}
1257
1258func (cmd *baseCommand) estimatePredExpSize(predExp []PredExp) int {
1259	sz := 0
1260	for _, predexp := range predExp {
1261		sz += predexp.marshaledSize()
1262	}
1263	cmd.dataOffset += sz + int(_FIELD_HEADER_SIZE)
1264	return sz
1265}
1266
1267// Generic header write.
1268func (cmd *baseCommand) writeHeader(policy *BasePolicy, readAttr int, writeAttr int, fieldCount int, operationCount int) {
1269	infoAttr := 0
1270
1271	switch policy.ReadModeSC {
1272	case ReadModeSCSession:
1273	case ReadModeSCLinearize:
1274		infoAttr |= _INFO3_SC_READ_TYPE
1275	case ReadModeSCAllowReplica:
1276		infoAttr |= _INFO3_SC_READ_RELAX
1277	case ReadModeSCAllowUnavailable:
1278		infoAttr |= _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX
1279	}
1280
1281	if policy.ReadModeAP == ReadModeAPAll {
1282		readAttr |= _INFO1_READ_MODE_AP_ALL
1283	}
1284
1285	if policy.UseCompression {
1286		readAttr |= _INFO1_COMPRESS_RESPONSE
1287	}
1288
1289	// Write all header data except total size which must be written last.
1290	cmd.dataBuffer[8] = _MSG_REMAINING_HEADER_SIZE // Message header length.
1291	cmd.dataBuffer[9] = byte(readAttr)
1292	cmd.dataBuffer[10] = byte(writeAttr)
1293	cmd.dataBuffer[11] = byte(infoAttr)
1294
1295	for i := 12; i < 26; i++ {
1296		cmd.dataBuffer[i] = 0
1297	}
1298	cmd.dataOffset = 26
1299	cmd.WriteInt16(int16(fieldCount))
1300	cmd.WriteInt16(int16(operationCount))
1301	cmd.dataOffset = int(_MSG_TOTAL_HEADER_SIZE)
1302}
1303
1304// Header write for write operations.
1305func (cmd *baseCommand) writeHeaderWithPolicy(policy *WritePolicy, readAttr int, writeAttr int, fieldCount int, operationCount int) {
1306	// Set flags.
1307	generation := uint32(0)
1308	infoAttr := 0
1309
1310	switch policy.RecordExistsAction {
1311	case UPDATE:
1312	case UPDATE_ONLY:
1313		infoAttr |= _INFO3_UPDATE_ONLY
1314	case REPLACE:
1315		infoAttr |= _INFO3_CREATE_OR_REPLACE
1316	case REPLACE_ONLY:
1317		infoAttr |= _INFO3_REPLACE_ONLY
1318	case CREATE_ONLY:
1319		writeAttr |= _INFO2_CREATE_ONLY
1320	}
1321
1322	switch policy.GenerationPolicy {
1323	case NONE:
1324	case EXPECT_GEN_EQUAL:
1325		generation = policy.Generation
1326		writeAttr |= _INFO2_GENERATION
1327	case EXPECT_GEN_GT:
1328		generation = policy.Generation
1329		writeAttr |= _INFO2_GENERATION_GT
1330	}
1331
1332	if policy.CommitLevel == COMMIT_MASTER {
1333		infoAttr |= _INFO3_COMMIT_MASTER
1334	}
1335
1336	if policy.DurableDelete {
1337		writeAttr |= _INFO2_DURABLE_DELETE
1338	}
1339
1340	switch policy.ReadModeSC {
1341	case ReadModeSCSession:
1342	case ReadModeSCLinearize:
1343		infoAttr |= _INFO3_SC_READ_TYPE
1344	case ReadModeSCAllowReplica:
1345		infoAttr |= _INFO3_SC_READ_RELAX
1346	case ReadModeSCAllowUnavailable:
1347		infoAttr |= _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX
1348	}
1349
1350	if policy.ReadModeAP == ReadModeAPAll {
1351		readAttr |= _INFO1_READ_MODE_AP_ALL
1352	}
1353
1354	if policy.UseCompression {
1355		readAttr |= _INFO1_COMPRESS_RESPONSE
1356	}
1357
1358	// Write all header data except total size which must be written last.
1359	cmd.dataBuffer[8] = _MSG_REMAINING_HEADER_SIZE // Message header length.
1360	cmd.dataBuffer[9] = byte(readAttr)
1361	cmd.dataBuffer[10] = byte(writeAttr)
1362	cmd.dataBuffer[11] = byte(infoAttr)
1363	cmd.dataBuffer[12] = 0 // unused
1364	cmd.dataBuffer[13] = 0 // clear the result code
1365	cmd.dataOffset = 14
1366	cmd.WriteUint32(generation)
1367	cmd.dataOffset = 18
1368	cmd.WriteUint32(policy.Expiration)
1369
1370	// Initialize timeout. It will be written later.
1371	cmd.dataBuffer[22] = 0
1372	cmd.dataBuffer[23] = 0
1373	cmd.dataBuffer[24] = 0
1374	cmd.dataBuffer[25] = 0
1375
1376	cmd.dataOffset = 26
1377	cmd.WriteInt16(int16(fieldCount))
1378	cmd.WriteInt16(int16(operationCount))
1379	cmd.dataOffset = int(_MSG_TOTAL_HEADER_SIZE)
1380}
1381
1382func (cmd *baseCommand) writeKey(key *Key, sendKey bool) {
1383	// Write key into buffer.
1384	if key.namespace != "" {
1385		cmd.writeFieldString(key.namespace, NAMESPACE)
1386	}
1387
1388	if key.setName != "" {
1389		cmd.writeFieldString(key.setName, TABLE)
1390	}
1391
1392	cmd.writeFieldBytes(key.digest[:], DIGEST_RIPE)
1393
1394	if sendKey {
1395		cmd.writeFieldValue(key.userKey, KEY)
1396	}
1397}
1398
1399func (cmd *baseCommand) writeOperationForBin(bin *Bin, operation OperationType) error {
1400	nameLength := copy(cmd.dataBuffer[(cmd.dataOffset+int(_OPERATION_HEADER_SIZE)):], bin.Name)
1401
1402	valueLength, err := bin.Value.EstimateSize()
1403	if err != nil {
1404		return err
1405	}
1406
1407	cmd.WriteInt32(int32(nameLength + valueLength + 4))
1408	cmd.WriteByte((operation.op))
1409	cmd.WriteByte((byte(bin.Value.GetType())))
1410	cmd.WriteByte((byte(0)))
1411	cmd.WriteByte((byte(nameLength)))
1412	cmd.dataOffset += nameLength
1413	_, err = bin.Value.write(cmd)
1414	return err
1415}
1416
1417func (cmd *baseCommand) writeOperationForBinNameAndValue(name string, val interface{}, operation OperationType) error {
1418	nameLength := copy(cmd.dataBuffer[(cmd.dataOffset+int(_OPERATION_HEADER_SIZE)):], name)
1419
1420	v := NewValue(val)
1421
1422	valueLength, err := v.EstimateSize()
1423	if err != nil {
1424		return err
1425	}
1426
1427	cmd.WriteInt32(int32(nameLength + valueLength + 4))
1428	cmd.WriteByte((operation.op))
1429	cmd.WriteByte((byte(v.GetType())))
1430	cmd.WriteByte((byte(0)))
1431	cmd.WriteByte((byte(nameLength)))
1432	cmd.dataOffset += nameLength
1433	_, err = v.write(cmd)
1434	return err
1435}
1436
1437func (cmd *baseCommand) writeOperationForOperation(operation *Operation) error {
1438	nameLength := copy(cmd.dataBuffer[(cmd.dataOffset+int(_OPERATION_HEADER_SIZE)):], operation.binName)
1439
1440	if operation.used {
1441		// cahce will set the used flag to false again
1442		operation.cache()
1443	}
1444
1445	if operation.encoder == nil {
1446		valueLength, err := operation.binValue.EstimateSize()
1447		if err != nil {
1448			return err
1449		}
1450
1451		cmd.WriteInt32(int32(nameLength + valueLength + 4))
1452		cmd.WriteByte((operation.opType.op))
1453		cmd.WriteByte((byte(operation.binValue.GetType())))
1454		cmd.WriteByte((byte(0)))
1455		cmd.WriteByte((byte(nameLength)))
1456		cmd.dataOffset += nameLength
1457		_, err = operation.binValue.write(cmd)
1458		return err
1459	}
1460
1461	valueLength, err := operation.encoder(operation, nil)
1462	if err != nil {
1463		return err
1464	}
1465
1466	cmd.WriteInt32(int32(nameLength + valueLength + 4))
1467	cmd.WriteByte((operation.opType.op))
1468	cmd.WriteByte((byte(ParticleType.BLOB)))
1469	cmd.WriteByte((byte(0)))
1470	cmd.WriteByte((byte(nameLength)))
1471	cmd.dataOffset += nameLength
1472	_, err = operation.encoder(operation, cmd)
1473	//mark the operation as used, so that it will be cached the next time it is used
1474	operation.used = err == nil
1475	return err
1476}
1477
1478func (cmd *baseCommand) writeOperationForBinName(name string, operation OperationType) {
1479	nameLength := copy(cmd.dataBuffer[(cmd.dataOffset+int(_OPERATION_HEADER_SIZE)):], name)
1480	cmd.WriteInt32(int32(nameLength + 4))
1481	cmd.WriteByte((operation.op))
1482	cmd.WriteByte(byte(0))
1483	cmd.WriteByte(byte(0))
1484	cmd.WriteByte(byte(nameLength))
1485	cmd.dataOffset += nameLength
1486}
1487
1488func (cmd *baseCommand) writeOperationForOperationType(operation OperationType) {
1489	cmd.WriteInt32(int32(4))
1490	cmd.WriteByte(operation.op)
1491	cmd.WriteByte(0)
1492	cmd.WriteByte(0)
1493	cmd.WriteByte(0)
1494}
1495
1496func (cmd *baseCommand) writePredExp(predExp []PredExp, predSize int) error {
1497	cmd.writeFieldHeader(predSize, PREDEXP)
1498	for i := range predExp {
1499		if err := predExp[i].marshal(cmd); err != nil {
1500			return err
1501		}
1502	}
1503	return nil
1504}
1505
1506func (cmd *baseCommand) writeFieldValue(value Value, ftype FieldType) error {
1507	vlen, err := value.EstimateSize()
1508	if err != nil {
1509		return err
1510	}
1511	cmd.writeFieldHeader(vlen+1, ftype)
1512	cmd.WriteByte(byte(value.GetType()))
1513
1514	_, err = value.write(cmd)
1515	return err
1516}
1517
1518func (cmd *baseCommand) writeUdfArgs(value *ValueArray) error {
1519	if value != nil {
1520		vlen, err := value.EstimateSize()
1521		if err != nil {
1522			return err
1523		}
1524		cmd.writeFieldHeader(vlen, UDF_ARGLIST)
1525		_, err = value.pack(cmd)
1526		return err
1527	}
1528
1529	cmd.writeFieldHeader(0, UDF_ARGLIST)
1530	return nil
1531
1532}
1533
1534func (cmd *baseCommand) writeFieldInt32(val int32, ftype FieldType) {
1535	cmd.writeFieldHeader(4, ftype)
1536	cmd.WriteInt32(val)
1537}
1538
1539func (cmd *baseCommand) writeFieldInt64(val int64, ftype FieldType) {
1540	cmd.writeFieldHeader(8, ftype)
1541	cmd.WriteInt64(val)
1542}
1543
1544func (cmd *baseCommand) writeFieldString(str string, ftype FieldType) {
1545	len := copy(cmd.dataBuffer[(cmd.dataOffset+int(_FIELD_HEADER_SIZE)):], str)
1546	cmd.writeFieldHeader(len, ftype)
1547	cmd.dataOffset += len
1548}
1549
1550func (cmd *baseCommand) writeFieldBytes(bytes []byte, ftype FieldType) {
1551	copy(cmd.dataBuffer[cmd.dataOffset+int(_FIELD_HEADER_SIZE):], bytes)
1552
1553	cmd.writeFieldHeader(len(bytes), ftype)
1554	cmd.dataOffset += len(bytes)
1555}
1556
1557func (cmd *baseCommand) writeFieldHeader(size int, ftype FieldType) {
1558	cmd.WriteInt32(int32(size + 1))
1559	cmd.WriteByte((byte(ftype)))
1560}
1561
1562// Int64ToBytes converts an int64 into slice of Bytes.
1563func (cmd *baseCommand) WriteInt64(num int64) (int, error) {
1564	return cmd.WriteUint64(uint64(num))
1565}
1566
1567// Uint64ToBytes converts an uint64 into slice of Bytes.
1568func (cmd *baseCommand) WriteUint64(num uint64) (int, error) {
1569	binary.BigEndian.PutUint64(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+8], num)
1570	cmd.dataOffset += 8
1571	return 8, nil
1572}
1573
1574// Int32ToBytes converts an int32 to a byte slice of size 4
1575func (cmd *baseCommand) WriteInt32(num int32) (int, error) {
1576	return cmd.WriteUint32(uint32(num))
1577}
1578
1579// Uint32ToBytes converts an uint32 to a byte slice of size 4
1580func (cmd *baseCommand) WriteUint32(num uint32) (int, error) {
1581	binary.BigEndian.PutUint32(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+4], num)
1582	cmd.dataOffset += 4
1583	return 4, nil
1584}
1585
1586// Uint32ToBytes converts an uint32 to a byte slice of size 4
1587func (cmd *baseCommand) WriteUint32At(num uint32, index int) (int, error) {
1588	binary.BigEndian.PutUint32(cmd.dataBuffer[index:index+4], num)
1589	return 4, nil
1590}
1591
1592// Int16ToBytes converts an int16 to slice of bytes
1593func (cmd *baseCommand) WriteInt16(num int16) (int, error) {
1594	return cmd.WriteUint16(uint16(num))
1595}
1596
1597// Int16ToBytes converts an int16 to slice of bytes
1598func (cmd *baseCommand) WriteUint16(num uint16) (int, error) {
1599	binary.BigEndian.PutUint16(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+2], num)
1600	cmd.dataOffset += 2
1601	return 2, nil
1602}
1603
1604func (cmd *baseCommand) WriteFloat32(float float32) (int, error) {
1605	bits := math.Float32bits(float)
1606	binary.BigEndian.PutUint32(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+4], bits)
1607	cmd.dataOffset += 4
1608	return 4, nil
1609}
1610
1611func (cmd *baseCommand) WriteFloat64(float float64) (int, error) {
1612	bits := math.Float64bits(float)
1613	binary.BigEndian.PutUint64(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+8], bits)
1614	cmd.dataOffset += 8
1615	return 8, nil
1616}
1617
1618func (cmd *baseCommand) WriteByte(b byte) error {
1619	cmd.dataBuffer[cmd.dataOffset] = b
1620	cmd.dataOffset++
1621	return nil
1622}
1623
1624func (cmd *baseCommand) WriteString(s string) (int, error) {
1625	copy(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+len(s)], s)
1626	cmd.dataOffset += len(s)
1627	return len(s), nil
1628}
1629
1630func (cmd *baseCommand) Write(b []byte) (int, error) {
1631	copy(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+len(b)], b)
1632	cmd.dataOffset += len(b)
1633	return len(b), nil
1634}
1635
1636func (cmd *baseCommand) begin() {
1637	cmd.dataOffset = int(_MSG_TOTAL_HEADER_SIZE)
1638}
1639
1640func (cmd *baseCommand) sizeBuffer(compress bool) error {
1641	return cmd.sizeBufferSz(cmd.dataOffset, compress)
1642}
1643
1644func (cmd *baseCommand) validateHeader(header int64) error {
1645	msgVersion := (uint64(header) & 0xFF00000000000000) >> 56
1646	if msgVersion != 2 {
1647		return NewAerospikeError(PARSE_ERROR, fmt.Sprintf("Invalid Message Header: Expected version to be 2, but got %v", msgVersion))
1648	}
1649
1650	msgType := (uint64(header) & 0x00FF000000000000) >> 49
1651	if !(msgType == 1 || msgType == 3 || msgType == 4) {
1652		return NewAerospikeError(PARSE_ERROR, fmt.Sprintf("Invalid Message Header: Expected type to be 1, 3 or 4, but got %v", msgType))
1653	}
1654
1655	msgSize := header & 0x0000FFFFFFFFFFFF
1656	if msgSize > int64(MaxBufferSize) {
1657		return NewAerospikeError(PARSE_ERROR, fmt.Sprintf("Invalid Message Header: Expected size to be under 10MiB, but got %v", msgSize))
1658	}
1659
1660	return nil
1661}
1662
1663var (
1664	// MaxBufferSize protects against allocating massive memory blocks
1665	// for buffers. Tweak this number if you are returning a lot of
1666	// LDT elements in your queries.
1667	MaxBufferSize = 1024 * 1024 * 120 // 120 MB
1668)
1669
1670const (
1671	msgHeaderPad  = 16
1672	zlibHeaderPad = 2
1673)
1674
1675func (cmd *baseCommand) sizeBufferSz(size int, willCompress bool) error {
1676
1677	if willCompress {
1678		// adds zlib and proto pads to the size of the buffer
1679		size += msgHeaderPad + zlibHeaderPad
1680	}
1681
1682	// Corrupted data streams can result in a huge length.
1683	// Do a sanity check here.
1684	if size > MaxBufferSize || size < 0 {
1685		return NewAerospikeError(PARSE_ERROR, fmt.Sprintf("Invalid size for buffer: %d", size))
1686	}
1687
1688	if size <= len(cmd.dataBuffer) {
1689		// don't touch the buffer
1690	} else if size <= cap(cmd.dataBuffer) {
1691		cmd.dataBuffer = cmd.dataBuffer[:size]
1692	} else {
1693		// not enough space
1694		cmd.dataBuffer = make([]byte, size)
1695	}
1696
1697	// The trick here to keep a ref to the buffer, and set the buffer itself
1698	// to a padded version of the original:
1699	// | Proto Header | Original Compressed Size | compressed message |
1700	// |    8 Bytes   |          8 Bytes         |                    |
1701	if willCompress {
1702		cmd.dataBufferCompress = cmd.dataBuffer
1703		cmd.dataBuffer = cmd.dataBufferCompress[msgHeaderPad+zlibHeaderPad:]
1704	}
1705
1706	return nil
1707}
1708
1709func (cmd *baseCommand) end() {
1710	var proto = int64(cmd.dataOffset-8) | (_CL_MSG_VERSION << 56) | (_AS_MSG_TYPE << 48)
1711	binary.BigEndian.PutUint64(cmd.dataBuffer[0:], uint64(proto))
1712}
1713
1714func (cmd *baseCommand) markCompressed(policy Policy) {
1715	cmd.compressed = policy.compress()
1716}
1717
1718func (cmd *baseCommand) compress() error {
1719	if cmd.compressed && cmd.dataOffset > _COMPRESS_THRESHOLD {
1720		b := bytes.NewBuffer(cmd.dataBufferCompress[msgHeaderPad:])
1721		b.Reset()
1722		w := zlib.NewWriter(b)
1723
1724		// There seems to be a bug either in Go's zlib or in zlibc
1725		// which messes up a single write block of bigger than 64KB to
1726		// the deflater.
1727		// Things work in multiple writes of 64KB though, so this is
1728		// how we're going to do it.
1729		i := 0
1730		const step = 64 * 1024
1731		for i+step < cmd.dataOffset {
1732			n, err := w.Write(cmd.dataBuffer[i : i+step])
1733			i += n
1734			if err != nil {
1735				return err
1736			}
1737		}
1738
1739		if i < cmd.dataOffset {
1740			_, err := w.Write(cmd.dataBuffer[i:cmd.dataOffset])
1741			if err != nil {
1742				return err
1743			}
1744		}
1745
1746		// flush
1747		w.Close()
1748
1749		compressedSz := b.Len()
1750
1751		// Use compressed buffer if compression completed within original buffer size.
1752		var proto = int64(compressedSz+8) | (_CL_MSG_VERSION << 56) | (_AS_MSG_TYPE_COMPRESSED << 48)
1753		binary.BigEndian.PutUint64(cmd.dataBufferCompress[0:], uint64(proto))
1754		binary.BigEndian.PutUint64(cmd.dataBufferCompress[8:], uint64(cmd.dataOffset))
1755
1756		cmd.dataBuffer = cmd.dataBufferCompress
1757		cmd.dataOffset = compressedSz + 16
1758		cmd.dataBufferCompress = nil
1759	}
1760
1761	return nil
1762}
1763
1764// isCompressed returns the length of the compressed buffer.
1765// If the buffer is not compressed, the result will be -1
1766func (cmd *baseCommand) compressedSize() int {
1767	proto := Buffer.BytesToInt64(cmd.dataBuffer, 0)
1768	size := proto & 0xFFFFFFFFFFFF
1769
1770	msgType := (proto >> 48) & 0xff
1771
1772	if msgType != _AS_MSG_TYPE_COMPRESSED {
1773		return -1
1774	}
1775
1776	return int(size)
1777}
1778
1779////////////////////////////////////
1780
1781func setInDoubt(err error, isRead bool, commandSentCounter int) error {
1782	// set inDoubt flag
1783	if ae, ok := err.(AerospikeError); ok {
1784		ae.SetInDoubt(isRead, commandSentCounter)
1785		return ae
1786	}
1787
1788	return err
1789}
1790
1791func (cmd *baseCommand) execute(ifc command, isRead bool) error {
1792	policy := ifc.getPolicy(ifc).GetBasePolicy()
1793	deadline := policy.deadline()
1794
1795	return cmd.executeAt(ifc, policy, isRead, deadline, -1, 0)
1796}
1797
1798func (cmd *baseCommand) executeAt(ifc command, policy *BasePolicy, isRead bool, deadline time.Time, iterations, commandSentCounter int) (err error) {
1799	// for exponential backoff
1800	interval := policy.SleepBetweenRetries
1801
1802	shouldSleep := false
1803	isClientTimeout := false
1804
1805	// Execute command until successful, timed out or maximum iterations have been reached.
1806	for {
1807		iterations++
1808
1809		// too many retries
1810		if (policy.MaxRetries <= 0 && iterations > 0) || (policy.MaxRetries > 0 && iterations > policy.MaxRetries) {
1811			if ae, ok := err.(AerospikeError); ok {
1812				err = NewAerospikeError(ae.ResultCode(), fmt.Sprintf("command execution timed out on client: Exceeded number of retries. See `Policy.MaxRetries`. (last error: %s)", err.Error()))
1813			}
1814
1815			return setInDoubt(err, isRead, commandSentCounter)
1816		}
1817
1818		// Sleep before trying again, after the first iteration
1819		if policy.SleepBetweenRetries > 0 && shouldSleep {
1820			// Do not sleep if you know you'll wake up after the deadline
1821			if policy.TotalTimeout > 0 && time.Now().Add(interval).After(deadline) {
1822				break
1823			}
1824
1825			time.Sleep(interval)
1826			if policy.SleepMultiplier > 1 {
1827				interval = time.Duration(float64(interval) * policy.SleepMultiplier)
1828			}
1829		}
1830
1831		if shouldSleep {
1832			aerr, ok := err.(AerospikeError)
1833			if !ifc.prepareRetry(ifc, isClientTimeout || (ok && aerr.ResultCode() != SERVER_NOT_AVAILABLE)) {
1834				if bc, ok := ifc.(batcher); ok {
1835					// Batch may be retried in separate commands.
1836					if retry, err := bc.retryBatch(bc, cmd.node.cluster, deadline, iterations, commandSentCounter); retry {
1837						// Batch was retried in separate commands. Complete this command.
1838						return err
1839					}
1840				}
1841			}
1842		}
1843
1844		// NOTE: This is important to be after the prepareRetry block above
1845		isClientTimeout = false
1846
1847		shouldSleep = true
1848
1849		// check for command timeout
1850		if policy.TotalTimeout > 0 && time.Now().After(deadline) {
1851			break
1852		}
1853
1854		// set command node, so when you return a record it has the node
1855		cmd.node, err = ifc.getNode(ifc)
1856		if cmd.node == nil || !cmd.node.IsActive() || err != nil {
1857			isClientTimeout = true
1858
1859			// Node is currently inactive. Retry.
1860			continue
1861		}
1862
1863		cmd.conn, err = ifc.getConnection(policy)
1864		if err != nil {
1865			isClientTimeout = true
1866
1867			if err == ErrConnectionPoolEmpty {
1868				// if the connection pool is empty, we still haven't tried
1869				// the transaction to increase the iteration count.
1870				iterations--
1871			}
1872			Logger.Debug("Node " + cmd.node.String() + ": " + err.Error())
1873			continue
1874		}
1875
1876		// Assign the connection buffer to the command buffer
1877		cmd.dataBuffer = cmd.conn.dataBuffer
1878
1879		// Set command buffer.
1880		err = ifc.writeBuffer(ifc)
1881		if err != nil {
1882			// All runtime exceptions are considered fatal. Do not retry.
1883			// Close socket to flush out possible garbage. Do not put back in pool.
1884			cmd.conn.Close()
1885			cmd.conn = nil
1886			return err
1887		}
1888
1889		// Reset timeout in send buffer (destined for server) and socket.
1890		binary.BigEndian.PutUint32(cmd.dataBuffer[22:], 0)
1891		if !deadline.IsZero() {
1892			serverTimeout := deadline.Sub(time.Now())
1893			if serverTimeout < time.Millisecond {
1894				serverTimeout = time.Millisecond
1895			}
1896			binary.BigEndian.PutUint32(cmd.dataBuffer[22:], uint32(serverTimeout/time.Millisecond))
1897		}
1898
1899		// now that the deadline has been set in the buffer, compress the contents
1900		if err = cmd.compress(); err != nil {
1901			return NewAerospikeError(SERIALIZE_ERROR, err.Error())
1902		}
1903
1904		// Send command.
1905		_, err = cmd.conn.Write(cmd.dataBuffer[:cmd.dataOffset])
1906		if err != nil {
1907			isClientTimeout = true
1908
1909			// IO errors are considered temporary anomalies. Retry.
1910			// Close socket to flush out possible garbage. Do not put back in pool.
1911			cmd.conn.Close()
1912			cmd.conn = nil
1913
1914			Logger.Debug("Node " + cmd.node.String() + ": " + err.Error())
1915			continue
1916		}
1917		commandSentCounter++
1918
1919		// Parse results.
1920		err = ifc.parseResult(ifc, cmd.conn)
1921		if err != nil {
1922			if _, ok := err.(net.Error); err == ErrTimeout || err == io.EOF || ok {
1923				isClientTimeout = true
1924				if err != ErrTimeout {
1925					if aerr, ok := err.(AerospikeError); ok && aerr.ResultCode() == TIMEOUT {
1926						isClientTimeout = false
1927					}
1928				}
1929
1930				// IO errors are considered temporary anomalies. Retry.
1931				// Close socket to flush out possible garbage. Do not put back in pool.
1932				cmd.conn.Close()
1933
1934				Logger.Debug("Node " + cmd.node.String() + ": " + err.Error())
1935
1936				// retry only for non-streaming commands
1937				if !cmd.oneShot {
1938					cmd.conn = nil
1939					continue
1940				}
1941			}
1942
1943			// close the connection
1944			// cancelling/closing the batch/multi commands will return an error, which will
1945			// close the connection to throw away its data and signal the server about the
1946			// situation. We will not put back the connection in the buffer.
1947			if cmd.conn.IsConnected() && KeepConnection(err) {
1948				// Put connection back in pool.
1949				cmd.node.PutConnection(cmd.conn)
1950			} else {
1951				cmd.conn.Close()
1952				cmd.conn = nil
1953			}
1954
1955			return setInDoubt(err, isRead, commandSentCounter)
1956		}
1957
1958		// in case it has grown and re-allocated
1959		if len(cmd.dataBufferCompress) > len(cmd.dataBuffer) {
1960			cmd.conn.dataBuffer = cmd.dataBufferCompress
1961		} else {
1962			cmd.conn.dataBuffer = cmd.dataBuffer
1963		}
1964
1965		// Put connection back in pool.
1966		// cmd.node.PutConnection(cmd.conn)
1967		ifc.putConnection(cmd.conn)
1968
1969		// command has completed successfully.  Exit method.
1970		return nil
1971
1972	}
1973
1974	// execution timeout
1975	return ErrTimeout
1976}
1977
1978func (cmd *baseCommand) parseRecordResults(ifc command, receiveSize int) (bool, error) {
1979	panic("Abstract method. Should not end up here")
1980}
1981