1// Copyright 2013-2020 Aerospike, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package aerospike 16 17import ( 18 "fmt" 19 "reflect" 20 "time" 21 22 . "github.com/aerospike/aerospike-client-go/types" 23 xrand "github.com/aerospike/aerospike-client-go/types/rand" 24 Buffer "github.com/aerospike/aerospike-client-go/utils/buffer" 25) 26 27type baseMultiCommand struct { 28 baseCommand 29 30 namespace string 31 clusterKey int64 32 first bool 33 34 terminationError ResultCode 35 36 recordset *Recordset 37 38 terminationErrorType ResultCode 39 40 errChan chan error 41 42 socketTimeout time.Duration 43 44 resObjType reflect.Type 45 resObjMappings map[string][]int 46 selectCases []reflect.SelectCase 47 48 bc bufferedConn 49} 50 51var multiObjectParser func( 52 cmd *baseMultiCommand, 53 obj reflect.Value, 54 opCount int, 55 fieldCount int, 56 generation uint32, 57 expiration uint32, 58) error 59 60var prepareReflectionData func(cmd *baseMultiCommand) 61 62func newMultiCommand(node *Node, recordset *Recordset) *baseMultiCommand { 63 cmd := &baseMultiCommand{ 64 baseCommand: baseCommand{ 65 node: node, 66 oneShot: true, 67 }, 68 recordset: recordset, 69 } 70 71 if prepareReflectionData != nil { 72 prepareReflectionData(cmd) 73 } 74 return cmd 75} 76 77func newCorrectMultiCommand(node *Node, recordset *Recordset, namespace string, clusterKey int64, first bool) *baseMultiCommand { 78 cmd := &baseMultiCommand{ 79 baseCommand: baseCommand{ 80 node: node, 81 oneShot: true, 82 }, 83 namespace: namespace, 84 clusterKey: clusterKey, 85 first: first, 86 recordset: recordset, 87 } 88 89 if prepareReflectionData != nil { 90 prepareReflectionData(cmd) 91 } 92 return cmd 93} 94 95func (cmd *baseMultiCommand) getNode(ifc command) (*Node, error) { 96 return cmd.node, nil 97} 98 99func (cmd *baseMultiCommand) prepareRetry(ifc command, isTimeout bool) bool { 100 return false 101} 102 103func (cmd *baseMultiCommand) getConnection(policy Policy) (*Connection, error) { 104 return cmd.node.getConnectionWithHint(policy.GetBasePolicy().deadline(), policy.GetBasePolicy().socketTimeout(), byte(xrand.Int64()%256)) 105} 106 107func (cmd *baseMultiCommand) putConnection(conn *Connection) { 108 cmd.node.putConnectionWithHint(conn, byte(xrand.Int64()%256)) 109} 110 111func (cmd *baseMultiCommand) parseResult(ifc command, conn *Connection) error { 112 // Read socket into receive buffer one record at a time. Do not read entire receive size 113 // because the receive buffer would be too big. 114 status := true 115 116 var err error 117 118 cmd.bc = newBufferedConn(conn, 0) 119 for status { 120 if err := cmd.conn.initInflater(false, 0); err != nil { 121 return NewAerospikeError(PARSE_ERROR, "Error setting up zlib inflater:", err.Error()) 122 } 123 cmd.bc.reset(8) 124 125 // Read header. 126 if cmd.dataBuffer, err = cmd.bc.read(8); err != nil { 127 return err 128 } 129 130 proto := Buffer.BytesToInt64(cmd.dataBuffer, 0) 131 receiveSize := int(proto & 0xFFFFFFFFFFFF) 132 if receiveSize <= 0 { 133 continue 134 } 135 136 if compressedSize := cmd.compressedSize(); compressedSize > 0 { 137 cmd.bc.reset(8) 138 // Read header. 139 if cmd.dataBuffer, err = cmd.bc.read(8); err != nil { 140 return err 141 } 142 143 receiveSize = int(Buffer.BytesToInt64(cmd.dataBuffer, 0)) - 8 144 if err := cmd.conn.initInflater(true, compressedSize-8); err != nil { 145 return NewAerospikeError(PARSE_ERROR, fmt.Sprintf("Error setting up zlib inflater for size `%d`: %s", compressedSize-8, err.Error())) 146 } 147 148 // read the first 8 bytes 149 cmd.bc.reset(8) 150 if cmd.dataBuffer, err = cmd.bc.read(8); err != nil { 151 return err 152 } 153 } 154 155 // Validate header to make sure we are at the beginning of a message 156 proto = Buffer.BytesToInt64(cmd.dataBuffer, 0) 157 if err := cmd.validateHeader(proto); err != nil { 158 return err 159 } 160 161 if receiveSize > 0 { 162 cmd.bc.reset(receiveSize) 163 164 status, err = ifc.parseRecordResults(ifc, receiveSize) 165 if err != nil { 166 cmd.bc.drainConn() 167 return err 168 } 169 } else { 170 status = false 171 } 172 } 173 174 // if the buffer has been resized, put it back so that it will be reassigned to the connection. 175 cmd.dataBuffer = cmd.bc.buf() 176 177 return nil 178} 179 180func (cmd *baseMultiCommand) parseKey(fieldCount int) (*Key, error) { 181 var digest [20]byte 182 var namespace, setName string 183 var userKey Value 184 var err error 185 186 for i := 0; i < fieldCount; i++ { 187 if err = cmd.readBytes(4); err != nil { 188 return nil, err 189 } 190 191 fieldlen := int(Buffer.BytesToUint32(cmd.dataBuffer, 0)) 192 if err = cmd.readBytes(fieldlen); err != nil { 193 return nil, err 194 } 195 196 fieldtype := FieldType(cmd.dataBuffer[0]) 197 size := fieldlen - 1 198 199 switch fieldtype { 200 case DIGEST_RIPE: 201 copy(digest[:], cmd.dataBuffer[1:size+1]) 202 case NAMESPACE: 203 namespace = string(cmd.dataBuffer[1 : size+1]) 204 case TABLE: 205 setName = string(cmd.dataBuffer[1 : size+1]) 206 case KEY: 207 if userKey, err = bytesToKeyValue(int(cmd.dataBuffer[1]), cmd.dataBuffer, 2, size-1); err != nil { 208 return nil, err 209 } 210 } 211 } 212 213 return &Key{namespace: namespace, setName: setName, digest: digest, userKey: userKey}, nil 214} 215 216func (cmd *baseMultiCommand) readBytes(length int) (err error) { 217 // Corrupted data streams can result in a huge length. 218 // Do a sanity check here. 219 if length > MaxBufferSize || length < 0 { 220 return NewAerospikeError(PARSE_ERROR, fmt.Sprintf("Invalid readBytes length: %d", length)) 221 } 222 223 cmd.dataBuffer, err = cmd.bc.read(length) 224 if err != nil { 225 return err 226 } 227 cmd.dataOffset += length 228 229 return nil 230} 231 232func (cmd *baseMultiCommand) parseRecordResults(ifc command, receiveSize int) (bool, error) { 233 // Read/parse remaining message bytes one record at a time. 234 cmd.dataOffset = 0 235 236 for cmd.dataOffset < receiveSize { 237 if err := cmd.readBytes(int(_MSG_REMAINING_HEADER_SIZE)); err != nil { 238 err = newNodeError(cmd.node, err) 239 return false, err 240 } 241 resultCode := ResultCode(cmd.dataBuffer[5] & 0xFF) 242 243 if resultCode != 0 { 244 if resultCode == KEY_NOT_FOUND_ERROR || resultCode == FILTERED_OUT { 245 return false, nil 246 } 247 err := NewAerospikeError(resultCode) 248 err = newNodeError(cmd.node, err) 249 return false, err 250 } 251 252 info3 := int(cmd.dataBuffer[3]) 253 254 // If cmd is the end marker of the response, do not proceed further 255 if (info3 & _INFO3_LAST) == _INFO3_LAST { 256 return false, nil 257 } 258 259 generation := Buffer.BytesToUint32(cmd.dataBuffer, 6) 260 expiration := TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10)) 261 fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18)) 262 opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20)) 263 264 key, err := cmd.parseKey(fieldCount) 265 if err != nil { 266 err = newNodeError(cmd.node, err) 267 return false, err 268 } 269 270 // if there is a recordset, process the record traditionally 271 // otherwise, it is supposed to be a record channel 272 if cmd.selectCases == nil { 273 // Parse bins. 274 var bins BinMap 275 276 for i := 0; i < opCount; i++ { 277 if err := cmd.readBytes(8); err != nil { 278 err = newNodeError(cmd.node, err) 279 return false, err 280 } 281 282 opSize := int(Buffer.BytesToUint32(cmd.dataBuffer, 0)) 283 particleType := int(cmd.dataBuffer[5]) 284 nameSize := int(cmd.dataBuffer[7]) 285 286 if err := cmd.readBytes(nameSize); err != nil { 287 err = newNodeError(cmd.node, err) 288 return false, err 289 } 290 name := string(cmd.dataBuffer[:nameSize]) 291 292 particleBytesSize := opSize - (4 + nameSize) 293 if err = cmd.readBytes(particleBytesSize); err != nil { 294 err = newNodeError(cmd.node, err) 295 return false, err 296 } 297 value, err := bytesToParticle(particleType, cmd.dataBuffer, 0, particleBytesSize) 298 if err != nil { 299 err = newNodeError(cmd.node, err) 300 return false, err 301 } 302 303 if bins == nil { 304 bins = make(BinMap, opCount) 305 } 306 bins[name] = value 307 } 308 309 // If the channel is full and it blocks, we don't want this command to 310 // block forever, or panic in case the channel is closed in the meantime. 311 select { 312 // send back the result on the async channel 313 case cmd.recordset.Records <- newRecord(cmd.node, key, bins, generation, expiration): 314 case <-cmd.recordset.cancelled: 315 return false, NewAerospikeError(cmd.terminationErrorType) 316 } 317 } else if multiObjectParser != nil { 318 obj := reflect.New(cmd.resObjType) 319 if err := multiObjectParser(cmd, obj, opCount, fieldCount, generation, expiration); err != nil { 320 err = newNodeError(cmd.node, err) 321 return false, err 322 } 323 324 // set the object to send 325 cmd.selectCases[0].Send = obj 326 327 chosen, _, _ := reflect.Select(cmd.selectCases) 328 switch chosen { 329 case 0: // object sent 330 case 1: // cancel channel is closed 331 return false, NewAerospikeError(cmd.terminationErrorType) 332 } 333 } 334 } 335 336 return true, nil 337} 338 339func (cmd *baseMultiCommand) execute(ifc command, isRead bool) error { 340 341 /*************************************************************************** 342 IMPORTANT: No need to send the error here to the recordset.Error channel. 343 It is being sent from the downstream command from the result 344 returned from the function. 345 ****************************************************************************/ 346 347 if cmd.clusterKey != 0 { 348 if !cmd.first { 349 if err := queryValidate(cmd.node, cmd.namespace, cmd.clusterKey); err != nil { 350 return err 351 } 352 } 353 354 if err := cmd.baseCommand.execute(ifc, isRead); err != nil { 355 return err 356 } 357 358 return queryValidate(cmd.node, cmd.namespace, cmd.clusterKey) 359 } 360 361 return cmd.baseCommand.execute(ifc, isRead) 362} 363