1// Copyright 2013-2020 Aerospike, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package aerospike
16
17import (
18	"reflect"
19
20	. "github.com/aerospike/aerospike-client-go/types"
21	Buffer "github.com/aerospike/aerospike-client-go/utils/buffer"
22)
23
24type batchCommandGet struct {
25	batchCommand
26
27	keys         []*Key
28	binNames     []string
29	records      []*Record
30	indexRecords []*BatchRead
31	readAttr     int
32	index        int
33	key          Key
34
35	// pointer to the object that's going to be unmarshalled
36	objects      []*reflect.Value
37	objectsFound []bool
38}
39
40// this method uses reflection.
41// Will not be set if performance flag is passed for the build.
42var batchObjectParser func(
43	cmd *batchCommandGet,
44	offset int,
45	opCount int,
46	fieldCount int,
47	generation uint32,
48	expiration uint32,
49) error
50
51func newBatchCommandGet(
52	node *Node,
53	batch *batchNode,
54	policy *BatchPolicy,
55	keys []*Key,
56	binNames []string,
57	records []*Record,
58	readAttr int,
59) *batchCommandGet {
60	res := &batchCommandGet{
61		batchCommand: batchCommand{
62			baseMultiCommand: *newMultiCommand(node, nil),
63			policy:           policy,
64			batch:            batch,
65		},
66		keys:     keys,
67		binNames: binNames,
68		records:  records,
69		readAttr: readAttr,
70	}
71	res.oneShot = false
72	return res
73}
74
75func (cmd *batchCommandGet) cloneBatchCommand(batch *batchNode) batcher {
76	res := *cmd
77	res.node = batch.Node
78	res.batch = batch
79
80	return &res
81}
82
83func (cmd *batchCommandGet) writeBuffer(ifc command) error {
84	return cmd.setBatchIndexReadCompat(cmd.policy, cmd.keys, cmd.batch, cmd.binNames, cmd.readAttr)
85}
86
87// On batch operations the key values are not returned from the server
88// So we reuse the Key on the batch Object
89func (cmd *batchCommandGet) parseKey(fieldCount int) error {
90	// var digest [20]byte
91	// var namespace, setName string
92	// var userKey Value
93	var err error
94
95	for i := 0; i < fieldCount; i++ {
96		if err = cmd.readBytes(4); err != nil {
97			return err
98		}
99
100		fieldlen := int(Buffer.BytesToUint32(cmd.dataBuffer, 0))
101		if err = cmd.readBytes(fieldlen); err != nil {
102			return err
103		}
104
105		fieldtype := FieldType(cmd.dataBuffer[0])
106		size := fieldlen - 1
107
108		switch fieldtype {
109		case DIGEST_RIPE:
110			copy(cmd.key.digest[:], cmd.dataBuffer[1:size+1])
111		case NAMESPACE:
112			cmd.key.namespace = string(cmd.dataBuffer[1 : size+1])
113		case TABLE:
114			cmd.key.setName = string(cmd.dataBuffer[1 : size+1])
115		case KEY:
116			if cmd.key.userKey, err = bytesToKeyValue(int(cmd.dataBuffer[1]), cmd.dataBuffer, 2, size-1); err != nil {
117				return err
118			}
119		}
120	}
121
122	return nil
123}
124
125// Parse all results in the batch.  Add records to shared list.
126// If the record was not found, the bins will be nil.
127func (cmd *batchCommandGet) parseRecordResults(ifc command, receiveSize int) (bool, error) {
128	//Parse each message response and add it to the result array
129	cmd.dataOffset = 0
130
131	for cmd.dataOffset < receiveSize {
132		if err := cmd.readBytes(int(_MSG_REMAINING_HEADER_SIZE)); err != nil {
133			return false, err
134		}
135		resultCode := ResultCode(cmd.dataBuffer[5] & 0xFF)
136
137		// The only valid server return codes are "ok" and "not found" and "filtered out".
138		// If other return codes are received, then abort the batch.
139		if resultCode != 0 && resultCode != KEY_NOT_FOUND_ERROR {
140			if resultCode == FILTERED_OUT {
141				cmd.filteredOutCnt++
142			} else {
143				return false, NewAerospikeError(resultCode)
144			}
145		}
146
147		info3 := int(cmd.dataBuffer[3])
148
149		// If cmd is the end marker of the response, do not proceed further
150		if (info3 & _INFO3_LAST) == _INFO3_LAST {
151			return false, nil
152		}
153
154		generation := Buffer.BytesToUint32(cmd.dataBuffer, 6)
155		expiration := TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10))
156		batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
157		fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
158		opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))
159		err := cmd.parseKey(fieldCount)
160		if err != nil {
161			return false, err
162		}
163
164		var offset int
165		offset = batchIndex
166
167		if cmd.indexRecords != nil {
168			if len(cmd.indexRecords) > 0 {
169				if resultCode == 0 {
170					if cmd.indexRecords[offset].Record, err = cmd.parseRecord(cmd.indexRecords[offset].Key, opCount, generation, expiration); err != nil {
171						return false, err
172					}
173				}
174			}
175		} else {
176			if resultCode == 0 {
177				if cmd.objects == nil {
178					if cmd.records[offset], err = cmd.parseRecord(cmd.keys[offset], opCount, generation, expiration); err != nil {
179						return false, err
180					}
181				} else if batchObjectParser != nil {
182					// mark it as found
183					cmd.objectsFound[offset] = true
184					if err := batchObjectParser(cmd, offset, opCount, fieldCount, generation, expiration); err != nil {
185						return false, err
186
187					}
188				}
189			}
190		}
191	}
192	return true, nil
193}
194
195// Parses the given byte buffer and populate the result object.
196// Returns the number of bytes that were parsed from the given buffer.
197func (cmd *batchCommandGet) parseRecord(key *Key, opCount int, generation, expiration uint32) (*Record, error) {
198	bins := make(BinMap, opCount)
199
200	for i := 0; i < opCount; i++ {
201		if err := cmd.readBytes(8); err != nil {
202			return nil, err
203		}
204		opSize := int(Buffer.BytesToUint32(cmd.dataBuffer, 0))
205		particleType := int(cmd.dataBuffer[5])
206		nameSize := int(cmd.dataBuffer[7])
207
208		if err := cmd.readBytes(nameSize); err != nil {
209			return nil, err
210		}
211		name := string(cmd.dataBuffer[:nameSize])
212
213		particleBytesSize := opSize - (4 + nameSize)
214		if err := cmd.readBytes(particleBytesSize); err != nil {
215			return nil, err
216		}
217		value, err := bytesToParticle(particleType, cmd.dataBuffer, 0, particleBytesSize)
218		if err != nil {
219			return nil, err
220		}
221
222		bins[name] = value
223	}
224
225	return newRecord(cmd.node, key, bins, generation, expiration), nil
226}
227
228func (cmd *batchCommandGet) Execute() error {
229	return cmd.execute(cmd, true)
230}
231
232func (cmd *batchCommandGet) generateBatchNodes(cluster *Cluster) ([]*batchNode, error) {
233	return newBatchNodeListKeys(cluster, cmd.policy, cmd.keys, cmd.sequenceAP, cmd.sequenceSC, cmd.batch)
234}
235