1// Copyright 2013-2020 Aerospike, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package aerospike 16 17import ( 18 "reflect" 19 20 . "github.com/aerospike/aerospike-client-go/types" 21 Buffer "github.com/aerospike/aerospike-client-go/utils/buffer" 22) 23 24type batchCommandGet struct { 25 batchCommand 26 27 keys []*Key 28 binNames []string 29 records []*Record 30 indexRecords []*BatchRead 31 readAttr int 32 index int 33 key Key 34 35 // pointer to the object that's going to be unmarshalled 36 objects []*reflect.Value 37 objectsFound []bool 38} 39 40// this method uses reflection. 41// Will not be set if performance flag is passed for the build. 42var batchObjectParser func( 43 cmd *batchCommandGet, 44 offset int, 45 opCount int, 46 fieldCount int, 47 generation uint32, 48 expiration uint32, 49) error 50 51func newBatchCommandGet( 52 node *Node, 53 batch *batchNode, 54 policy *BatchPolicy, 55 keys []*Key, 56 binNames []string, 57 records []*Record, 58 readAttr int, 59) *batchCommandGet { 60 res := &batchCommandGet{ 61 batchCommand: batchCommand{ 62 baseMultiCommand: *newMultiCommand(node, nil), 63 policy: policy, 64 batch: batch, 65 }, 66 keys: keys, 67 binNames: binNames, 68 records: records, 69 readAttr: readAttr, 70 } 71 res.oneShot = false 72 return res 73} 74 75func (cmd *batchCommandGet) cloneBatchCommand(batch *batchNode) batcher { 76 res := *cmd 77 res.node = batch.Node 78 res.batch = batch 79 80 return &res 81} 82 83func (cmd *batchCommandGet) writeBuffer(ifc command) error { 84 return cmd.setBatchIndexReadCompat(cmd.policy, cmd.keys, cmd.batch, cmd.binNames, cmd.readAttr) 85} 86 87// On batch operations the key values are not returned from the server 88// So we reuse the Key on the batch Object 89func (cmd *batchCommandGet) parseKey(fieldCount int) error { 90 // var digest [20]byte 91 // var namespace, setName string 92 // var userKey Value 93 var err error 94 95 for i := 0; i < fieldCount; i++ { 96 if err = cmd.readBytes(4); err != nil { 97 return err 98 } 99 100 fieldlen := int(Buffer.BytesToUint32(cmd.dataBuffer, 0)) 101 if err = cmd.readBytes(fieldlen); err != nil { 102 return err 103 } 104 105 fieldtype := FieldType(cmd.dataBuffer[0]) 106 size := fieldlen - 1 107 108 switch fieldtype { 109 case DIGEST_RIPE: 110 copy(cmd.key.digest[:], cmd.dataBuffer[1:size+1]) 111 case NAMESPACE: 112 cmd.key.namespace = string(cmd.dataBuffer[1 : size+1]) 113 case TABLE: 114 cmd.key.setName = string(cmd.dataBuffer[1 : size+1]) 115 case KEY: 116 if cmd.key.userKey, err = bytesToKeyValue(int(cmd.dataBuffer[1]), cmd.dataBuffer, 2, size-1); err != nil { 117 return err 118 } 119 } 120 } 121 122 return nil 123} 124 125// Parse all results in the batch. Add records to shared list. 126// If the record was not found, the bins will be nil. 127func (cmd *batchCommandGet) parseRecordResults(ifc command, receiveSize int) (bool, error) { 128 //Parse each message response and add it to the result array 129 cmd.dataOffset = 0 130 131 for cmd.dataOffset < receiveSize { 132 if err := cmd.readBytes(int(_MSG_REMAINING_HEADER_SIZE)); err != nil { 133 return false, err 134 } 135 resultCode := ResultCode(cmd.dataBuffer[5] & 0xFF) 136 137 // The only valid server return codes are "ok" and "not found" and "filtered out". 138 // If other return codes are received, then abort the batch. 139 if resultCode != 0 && resultCode != KEY_NOT_FOUND_ERROR { 140 if resultCode == FILTERED_OUT { 141 cmd.filteredOutCnt++ 142 } else { 143 return false, NewAerospikeError(resultCode) 144 } 145 } 146 147 info3 := int(cmd.dataBuffer[3]) 148 149 // If cmd is the end marker of the response, do not proceed further 150 if (info3 & _INFO3_LAST) == _INFO3_LAST { 151 return false, nil 152 } 153 154 generation := Buffer.BytesToUint32(cmd.dataBuffer, 6) 155 expiration := TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10)) 156 batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14)) 157 fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18)) 158 opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20)) 159 err := cmd.parseKey(fieldCount) 160 if err != nil { 161 return false, err 162 } 163 164 var offset int 165 offset = batchIndex 166 167 if cmd.indexRecords != nil { 168 if len(cmd.indexRecords) > 0 { 169 if resultCode == 0 { 170 if cmd.indexRecords[offset].Record, err = cmd.parseRecord(cmd.indexRecords[offset].Key, opCount, generation, expiration); err != nil { 171 return false, err 172 } 173 } 174 } 175 } else { 176 if resultCode == 0 { 177 if cmd.objects == nil { 178 if cmd.records[offset], err = cmd.parseRecord(cmd.keys[offset], opCount, generation, expiration); err != nil { 179 return false, err 180 } 181 } else if batchObjectParser != nil { 182 // mark it as found 183 cmd.objectsFound[offset] = true 184 if err := batchObjectParser(cmd, offset, opCount, fieldCount, generation, expiration); err != nil { 185 return false, err 186 187 } 188 } 189 } 190 } 191 } 192 return true, nil 193} 194 195// Parses the given byte buffer and populate the result object. 196// Returns the number of bytes that were parsed from the given buffer. 197func (cmd *batchCommandGet) parseRecord(key *Key, opCount int, generation, expiration uint32) (*Record, error) { 198 bins := make(BinMap, opCount) 199 200 for i := 0; i < opCount; i++ { 201 if err := cmd.readBytes(8); err != nil { 202 return nil, err 203 } 204 opSize := int(Buffer.BytesToUint32(cmd.dataBuffer, 0)) 205 particleType := int(cmd.dataBuffer[5]) 206 nameSize := int(cmd.dataBuffer[7]) 207 208 if err := cmd.readBytes(nameSize); err != nil { 209 return nil, err 210 } 211 name := string(cmd.dataBuffer[:nameSize]) 212 213 particleBytesSize := opSize - (4 + nameSize) 214 if err := cmd.readBytes(particleBytesSize); err != nil { 215 return nil, err 216 } 217 value, err := bytesToParticle(particleType, cmd.dataBuffer, 0, particleBytesSize) 218 if err != nil { 219 return nil, err 220 } 221 222 bins[name] = value 223 } 224 225 return newRecord(cmd.node, key, bins, generation, expiration), nil 226} 227 228func (cmd *batchCommandGet) Execute() error { 229 return cmd.execute(cmd, true) 230} 231 232func (cmd *batchCommandGet) generateBatchNodes(cluster *Cluster) ([]*batchNode, error) { 233 return newBatchNodeListKeys(cluster, cmd.policy, cmd.keys, cmd.sequenceAP, cmd.sequenceSC, cmd.batch) 234} 235