1// Copyright 2013-2020 Aerospike, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package aerospike 16 17import ( 18 "bytes" 19 "compress/zlib" 20 "encoding/binary" 21 "fmt" 22 "io" 23 "math" 24 "net" 25 "time" 26 27 . "github.com/aerospike/aerospike-client-go/logger" 28 . "github.com/aerospike/aerospike-client-go/types" 29 30 ParticleType "github.com/aerospike/aerospike-client-go/internal/particle_type" 31 Buffer "github.com/aerospike/aerospike-client-go/utils/buffer" 32) 33 34const ( 35 // Flags commented out are not supported by cmd client. 36 // Contains a read operation. 37 _INFO1_READ int = (1 << 0) 38 // Get all bins. 39 _INFO1_GET_ALL int = (1 << 1) 40 // Batch read or exists. 41 _INFO1_BATCH int = (1 << 3) 42 43 // Do not read the bins 44 _INFO1_NOBINDATA int = (1 << 5) 45 46 // Involve all replicas in read operation. 47 _INFO1_READ_MODE_AP_ALL = (1 << 6) 48 49 // Tell server to compress its response. 50 _INFO1_COMPRESS_RESPONSE = (1 << 7) 51 52 // Create or update record 53 _INFO2_WRITE int = (1 << 0) 54 // Fling a record into the belly of Moloch. 55 _INFO2_DELETE int = (1 << 1) 56 // Update if expected generation == old. 57 _INFO2_GENERATION int = (1 << 2) 58 // Update if new generation >= old, good for restore. 59 _INFO2_GENERATION_GT int = (1 << 3) 60 // Transaction resulting in record deletion leaves tombstone (Enterprise only). 61 _INFO2_DURABLE_DELETE int = (1 << 4) 62 // Create only. Fail if record already exists. 63 _INFO2_CREATE_ONLY int = (1 << 5) 64 65 // Return a result for every operation. 66 _INFO2_RESPOND_ALL_OPS int = (1 << 7) 67 68 // This is the last of a multi-part message. 69 _INFO3_LAST int = (1 << 0) 70 // Commit to master only before declaring success. 71 _INFO3_COMMIT_MASTER int = (1 << 1) 72 // Update only. Merge bins. 73 _INFO3_UPDATE_ONLY int = (1 << 3) 74 75 // Create or completely replace record. 76 _INFO3_CREATE_OR_REPLACE int = (1 << 4) 77 // Completely replace existing record only. 78 _INFO3_REPLACE_ONLY int = (1 << 5) 79 // See Below 80 _INFO3_SC_READ_TYPE int = (1 << 6) 81 // See Below 82 _INFO3_SC_READ_RELAX int = (1 << 7) 83 84 // Interpret SC_READ bits in info3. 85 // 86 // RELAX TYPE 87 // strict 88 // ------ 89 // 0 0 sequential (default) 90 // 0 1 linearize 91 // 92 // relaxed 93 // ------- 94 // 1 0 allow replica 95 // 1 1 allow unavailable 96 97 _MSG_TOTAL_HEADER_SIZE uint8 = 30 98 _FIELD_HEADER_SIZE uint8 = 5 99 _OPERATION_HEADER_SIZE uint8 = 8 100 _MSG_REMAINING_HEADER_SIZE uint8 = 22 101 _DIGEST_SIZE uint8 = 20 102 _COMPRESS_THRESHOLD int = 128 103 _CL_MSG_VERSION int64 = 2 104 _AS_MSG_TYPE int64 = 3 105 _AS_MSG_TYPE_COMPRESSED int64 = 4 106) 107 108// command interface describes all commands available 109type command interface { 110 getPolicy(ifc command) Policy 111 112 writeBuffer(ifc command) error 113 getNode(ifc command) (*Node, error) 114 getConnection(policy Policy) (*Connection, error) 115 putConnection(conn *Connection) 116 parseResult(ifc command, conn *Connection) error 117 parseRecordResults(ifc command, receiveSize int) (bool, error) 118 prepareRetry(ifc command, isTimeout bool) bool 119 120 execute(ifc command, isRead bool) error 121 executeAt(ifc command, policy *BasePolicy, isRead bool, deadline time.Time, iterations, commandSentCounter int) error 122 123 // Executes the command 124 Execute() error 125} 126 127// Holds data buffer for the command 128type baseCommand struct { 129 node *Node 130 conn *Connection 131 132 // dataBufferCompress is not a second buffer; it is just a pointer to 133 // the beginning of the dataBuffer. 134 // To avoid allocating multiple buffers before compression, the dataBuffer 135 // will be referencing to a padded buffer. After the command is written to 136 // the buffer, this padding will be used to compress the command in-place, 137 // and then the compressed proto header will be written. 138 dataBufferCompress []byte 139 dataBuffer []byte 140 dataOffset int 141 142 // oneShot determines if streaming commands like query, scan or queryAggregate 143 // are not retried if they error out mid-parsing 144 oneShot bool 145 146 // will determine if the buffer will be compressed 147 // before being sent to the server 148 compressed bool 149} 150 151// Writes the command for write operations 152func (cmd *baseCommand) setWrite(policy *WritePolicy, operation OperationType, key *Key, bins []*Bin, binMap BinMap) error { 153 cmd.begin() 154 fieldCount, err := cmd.estimateKeySize(key, policy.SendKey) 155 if err != nil { 156 return err 157 } 158 159 predSize := 0 160 if len(policy.PredExp) > 0 { 161 predSize = cmd.estimatePredExpSize(policy.PredExp) 162 fieldCount++ 163 } 164 165 if binMap == nil { 166 for i := range bins { 167 if err := cmd.estimateOperationSizeForBin(bins[i]); err != nil { 168 return err 169 } 170 } 171 } else { 172 for name, value := range binMap { 173 if err := cmd.estimateOperationSizeForBinNameAndValue(name, value); err != nil { 174 return err 175 } 176 } 177 } 178 179 if err := cmd.sizeBuffer(policy.compress()); err != nil { 180 return err 181 } 182 183 if binMap == nil { 184 cmd.writeHeaderWithPolicy(policy, 0, _INFO2_WRITE, fieldCount, len(bins)) 185 } else { 186 cmd.writeHeaderWithPolicy(policy, 0, _INFO2_WRITE, fieldCount, len(binMap)) 187 } 188 189 cmd.writeKey(key, policy.SendKey) 190 191 if len(policy.PredExp) > 0 { 192 if err := cmd.writePredExp(policy.PredExp, predSize); err != nil { 193 return err 194 } 195 } 196 197 if binMap == nil { 198 for i := range bins { 199 if err := cmd.writeOperationForBin(bins[i], operation); err != nil { 200 return err 201 } 202 } 203 } else { 204 for name, value := range binMap { 205 if err := cmd.writeOperationForBinNameAndValue(name, value, operation); err != nil { 206 return err 207 } 208 } 209 } 210 211 cmd.end() 212 cmd.markCompressed(policy) 213 214 return nil 215} 216 217// Writes the command for delete operations 218func (cmd *baseCommand) setDelete(policy *WritePolicy, key *Key) error { 219 cmd.begin() 220 fieldCount, err := cmd.estimateKeySize(key, false) 221 if err != nil { 222 return err 223 } 224 225 predSize := 0 226 if len(policy.PredExp) > 0 { 227 predSize = cmd.estimatePredExpSize(policy.PredExp) 228 fieldCount++ 229 } 230 231 if err := cmd.sizeBuffer(policy.compress()); err != nil { 232 return err 233 } 234 cmd.writeHeaderWithPolicy(policy, 0, _INFO2_WRITE|_INFO2_DELETE, fieldCount, 0) 235 cmd.writeKey(key, false) 236 if len(policy.PredExp) > 0 { 237 if err := cmd.writePredExp(policy.PredExp, predSize); err != nil { 238 return err 239 } 240 } 241 cmd.end() 242 cmd.markCompressed(policy) 243 244 return nil 245 246} 247 248// Writes the command for touch operations 249func (cmd *baseCommand) setTouch(policy *WritePolicy, key *Key) error { 250 cmd.begin() 251 fieldCount, err := cmd.estimateKeySize(key, policy.SendKey) 252 if err != nil { 253 return err 254 } 255 256 predSize := 0 257 if len(policy.PredExp) > 0 { 258 predSize = cmd.estimatePredExpSize(policy.PredExp) 259 fieldCount++ 260 } 261 262 cmd.estimateOperationSize() 263 if err := cmd.sizeBuffer(false); err != nil { 264 return err 265 } 266 cmd.writeHeaderWithPolicy(policy, 0, _INFO2_WRITE, fieldCount, 1) 267 cmd.writeKey(key, policy.SendKey) 268 if len(policy.PredExp) > 0 { 269 if err := cmd.writePredExp(policy.PredExp, predSize); err != nil { 270 return err 271 } 272 } 273 cmd.writeOperationForOperationType(_TOUCH) 274 cmd.end() 275 return nil 276 277} 278 279// Writes the command for exist operations 280func (cmd *baseCommand) setExists(policy *BasePolicy, key *Key) error { 281 cmd.begin() 282 fieldCount, err := cmd.estimateKeySize(key, false) 283 if err != nil { 284 return err 285 } 286 287 predSize := 0 288 if len(policy.PredExp) > 0 { 289 predSize = cmd.estimatePredExpSize(policy.PredExp) 290 fieldCount++ 291 } 292 293 if err := cmd.sizeBuffer(false); err != nil { 294 return err 295 } 296 cmd.writeHeader(policy, _INFO1_READ|_INFO1_NOBINDATA, 0, fieldCount, 0) 297 cmd.writeKey(key, false) 298 if len(policy.PredExp) > 0 { 299 if err := cmd.writePredExp(policy.PredExp, predSize); err != nil { 300 return err 301 } 302 } 303 cmd.end() 304 return nil 305 306} 307 308// Writes the command for get operations (all bins) 309func (cmd *baseCommand) setReadForKeyOnly(policy *BasePolicy, key *Key) error { 310 cmd.begin() 311 fieldCount, err := cmd.estimateKeySize(key, false) 312 if err != nil { 313 return err 314 } 315 predSize := 0 316 if len(policy.PredExp) > 0 { 317 predSize = cmd.estimatePredExpSize(policy.PredExp) 318 fieldCount++ 319 } 320 if err := cmd.sizeBuffer(false); err != nil { 321 return err 322 } 323 cmd.writeHeader(policy, _INFO1_READ|_INFO1_GET_ALL, 0, fieldCount, 0) 324 cmd.writeKey(key, false) 325 if len(policy.PredExp) > 0 { 326 if err := cmd.writePredExp(policy.PredExp, predSize); err != nil { 327 return err 328 } 329 } 330 cmd.end() 331 return nil 332 333} 334 335// Writes the command for get operations (specified bins) 336func (cmd *baseCommand) setRead(policy *BasePolicy, key *Key, binNames []string) (err error) { 337 if len(binNames) > 0 { 338 cmd.begin() 339 fieldCount, err := cmd.estimateKeySize(key, false) 340 if err != nil { 341 return err 342 } 343 344 predSize := 0 345 if len(policy.PredExp) > 0 { 346 predSize = cmd.estimatePredExpSize(policy.PredExp) 347 fieldCount++ 348 } 349 350 for i := range binNames { 351 cmd.estimateOperationSizeForBinName(binNames[i]) 352 } 353 if err = cmd.sizeBuffer(false); err != nil { 354 return nil 355 } 356 cmd.writeHeader(policy, _INFO1_READ, 0, fieldCount, len(binNames)) 357 cmd.writeKey(key, false) 358 359 if len(policy.PredExp) > 0 { 360 cmd.writePredExp(policy.PredExp, predSize) 361 } 362 363 for i := range binNames { 364 cmd.writeOperationForBinName(binNames[i], _READ) 365 } 366 cmd.end() 367 } else { 368 err = cmd.setReadForKeyOnly(policy, key) 369 } 370 371 return err 372} 373 374// Writes the command for getting metadata operations 375func (cmd *baseCommand) setReadHeader(policy *BasePolicy, key *Key) error { 376 cmd.begin() 377 fieldCount, err := cmd.estimateKeySize(key, false) 378 if err != nil { 379 return err 380 } 381 382 predSize := 0 383 if len(policy.PredExp) > 0 { 384 predSize = cmd.estimatePredExpSize(policy.PredExp) 385 fieldCount++ 386 } 387 388 cmd.estimateOperationSizeForBinName("") 389 if err := cmd.sizeBuffer(policy.compress()); err != nil { 390 return err 391 } 392 393 cmd.writeHeader(policy, _INFO1_READ|_INFO1_NOBINDATA, 0, fieldCount, 1) 394 395 cmd.writeKey(key, false) 396 if len(policy.PredExp) > 0 { 397 if err := cmd.writePredExp(policy.PredExp, predSize); err != nil { 398 return err 399 } 400 } 401 cmd.writeOperationForBinName("", _READ) 402 cmd.end() 403 return nil 404 405} 406 407// Implements different command operations 408func (cmd *baseCommand) setOperate(policy *WritePolicy, key *Key, operations []*Operation) (bool, error) { 409 if len(operations) == 0 { 410 return false, NewAerospikeError(PARAMETER_ERROR, "No operations were passed.") 411 } 412 413 cmd.begin() 414 fieldCount := 0 415 readAttr := 0 416 writeAttr := 0 417 hasWrite := false 418 readBin := false 419 readHeader := false 420 RespondPerEachOp := policy.RespondPerEachOp 421 422 for i := range operations { 423 switch operations[i].opType { 424 case _BIT_READ: 425 fallthrough 426 case _HLL_READ: 427 fallthrough 428 case _MAP_READ: 429 // Map operations require RespondPerEachOp to be true. 430 RespondPerEachOp = true 431 // Fall through to read. 432 fallthrough 433 case _READ, _CDT_READ: 434 if !operations[i].headerOnly { 435 readAttr |= _INFO1_READ 436 437 // Read all bins if no bin is specified. 438 if operations[i].binName == "" { 439 readAttr |= _INFO1_GET_ALL 440 } 441 readBin = true 442 } else { 443 readAttr |= _INFO1_READ 444 readHeader = true 445 } 446 case _BIT_MODIFY: 447 fallthrough 448 case _HLL_MODIFY: 449 fallthrough 450 case _MAP_MODIFY: 451 // Map operations require RespondPerEachOp to be true. 452 RespondPerEachOp = true 453 // Fall through to default. 454 fallthrough 455 default: 456 writeAttr = _INFO2_WRITE 457 hasWrite = true 458 } 459 cmd.estimateOperationSizeForOperation(operations[i]) 460 } 461 462 ksz, err := cmd.estimateKeySize(key, policy.SendKey && hasWrite) 463 if err != nil { 464 return hasWrite, err 465 } 466 fieldCount += ksz 467 468 predSize := 0 469 if len(policy.PredExp) > 0 { 470 predSize = cmd.estimatePredExpSize(policy.PredExp) 471 fieldCount++ 472 } 473 474 if err := cmd.sizeBuffer(policy.compress()); err != nil { 475 return hasWrite, err 476 } 477 478 if readHeader && !readBin { 479 readAttr |= _INFO1_NOBINDATA 480 } 481 482 if RespondPerEachOp { 483 writeAttr |= _INFO2_RESPOND_ALL_OPS 484 } 485 486 if writeAttr != 0 { 487 cmd.writeHeaderWithPolicy(policy, readAttr, writeAttr, fieldCount, len(operations)) 488 } else { 489 cmd.writeHeader(&policy.BasePolicy, readAttr, writeAttr, fieldCount, len(operations)) 490 } 491 cmd.writeKey(key, policy.SendKey && hasWrite) 492 493 if len(policy.PredExp) > 0 { 494 if err := cmd.writePredExp(policy.PredExp, predSize); err != nil { 495 return hasWrite, err 496 } 497 } 498 499 for _, operation := range operations { 500 if err := cmd.writeOperationForOperation(operation); err != nil { 501 return hasWrite, err 502 } 503 } 504 505 cmd.end() 506 cmd.markCompressed(policy) 507 508 return hasWrite, nil 509} 510 511func (cmd *baseCommand) setUdf(policy *WritePolicy, key *Key, packageName string, functionName string, args *ValueArray) error { 512 cmd.begin() 513 fieldCount, err := cmd.estimateKeySize(key, policy.SendKey) 514 if err != nil { 515 return err 516 } 517 518 predSize := 0 519 if len(policy.PredExp) > 0 { 520 predSize = cmd.estimatePredExpSize(policy.PredExp) 521 fieldCount++ 522 } 523 524 fc, err := cmd.estimateUdfSize(packageName, functionName, args) 525 if err != nil { 526 return err 527 } 528 fieldCount += fc 529 530 if err := cmd.sizeBuffer(policy.compress()); err != nil { 531 return err 532 } 533 534 cmd.writeHeaderWithPolicy(policy, 0, _INFO2_WRITE, fieldCount, 0) 535 cmd.writeKey(key, policy.SendKey) 536 if len(policy.PredExp) > 0 { 537 if err := cmd.writePredExp(policy.PredExp, predSize); err != nil { 538 return err 539 } 540 } 541 cmd.writeFieldString(packageName, UDF_PACKAGE_NAME) 542 cmd.writeFieldString(functionName, UDF_FUNCTION) 543 cmd.writeUdfArgs(args) 544 cmd.end() 545 cmd.markCompressed(policy) 546 547 return nil 548} 549 550func (cmd *baseCommand) setBatchIndexReadCompat(policy *BatchPolicy, keys []*Key, batch *batchNode, binNames []string, readAttr int) error { 551 offsets := batch.offsets 552 max := len(batch.offsets) 553 fieldCountRow := 1 554 if policy.SendSetName { 555 fieldCountRow = 2 556 } 557 558 binNameSize := 0 559 operationCount := len(binNames) 560 for _, binName := range binNames { 561 binNameSize += len(binName) + int(_OPERATION_HEADER_SIZE) 562 } 563 564 // Estimate buffer size 565 cmd.begin() 566 fieldCount := 1 567 predSize := 0 568 if len(policy.PredExp) > 0 { 569 predSize = cmd.estimatePredExpSize(policy.PredExp) 570 fieldCount++ 571 } 572 573 cmd.dataOffset += int(_FIELD_HEADER_SIZE) + 5 574 575 var prev *Key 576 for i := 0; i < max; i++ { 577 key := keys[offsets[i]] 578 cmd.dataOffset += len(key.digest) + 4 579 580 // Try reference equality in hope that namespace/set for all keys is set from fixed variables. 581 if prev != nil && prev.namespace == key.namespace && 582 (!policy.SendSetName || prev.setName == key.setName) { 583 // Can set repeat previous namespace/bin names to save space. 584 cmd.dataOffset++ 585 } else { 586 // Must write full header and namespace/set/bin names. 587 cmd.dataOffset += len(key.namespace) + int(_FIELD_HEADER_SIZE) + 6 588 589 if policy.SendSetName { 590 cmd.dataOffset += len(key.setName) + int(_FIELD_HEADER_SIZE) 591 } 592 cmd.dataOffset += binNameSize 593 prev = key 594 } 595 } 596 597 if err := cmd.sizeBuffer(policy.compress()); err != nil { 598 return err 599 } 600 601 if policy.ReadModeAP == ReadModeAPAll { 602 readAttr |= _INFO1_READ_MODE_AP_ALL 603 } 604 605 if len(binNames) == 0 { 606 readAttr |= _INFO1_GET_ALL 607 } 608 609 cmd.writeHeader(&policy.BasePolicy, readAttr|_INFO1_BATCH, 0, fieldCount, 0) 610 611 if len(policy.PredExp) > 0 { 612 if err := cmd.writePredExp(policy.PredExp, predSize); err != nil { 613 return err 614 } 615 } 616 617 // Write real field size. 618 fieldSizeOffset := cmd.dataOffset 619 if policy.SendSetName { 620 cmd.writeFieldHeader(0, BATCH_INDEX_WITH_SET) 621 } else { 622 cmd.writeFieldHeader(0, BATCH_INDEX) 623 } 624 625 cmd.WriteUint32(uint32(max)) 626 627 if policy.AllowInline { 628 cmd.WriteByte(1) 629 } else { 630 cmd.WriteByte(0) 631 } 632 633 prev = nil 634 for i := 0; i < max; i++ { 635 index := offsets[i] 636 cmd.WriteUint32(uint32(index)) 637 638 key := keys[index] 639 cmd.Write(key.digest[:]) 640 // Try reference equality in hope that namespace/set for all keys is set from fixed variables. 641 if prev != nil && prev.namespace == key.namespace && 642 (!policy.SendSetName || prev.setName == key.setName) { 643 // Can set repeat previous namespace/bin names to save space. 644 cmd.WriteByte(1) // repeat 645 } else { 646 // Write full header, namespace and bin names. 647 cmd.WriteByte(0) // do not repeat 648 cmd.WriteByte(byte(readAttr)) 649 cmd.WriteUint16(uint16(fieldCountRow)) 650 cmd.WriteUint16(uint16(operationCount)) 651 cmd.writeFieldString(key.namespace, NAMESPACE) 652 653 if policy.SendSetName { 654 cmd.writeFieldString(key.setName, TABLE) 655 } 656 657 for _, binName := range binNames { 658 cmd.writeOperationForBinName(binName, _READ) 659 } 660 661 prev = key 662 } 663 } 664 665 cmd.WriteUint32At(uint32(cmd.dataOffset)-uint32(_MSG_TOTAL_HEADER_SIZE)-4, fieldSizeOffset) 666 cmd.end() 667 cmd.markCompressed(policy) 668 669 return nil 670} 671 672func (cmd *baseCommand) setBatchIndexRead(policy *BatchPolicy, records []*BatchRead, batch *batchNode) error { 673 offsets := batch.offsets 674 max := len(batch.offsets) 675 fieldCountRow := 1 676 if policy.SendSetName { 677 fieldCountRow = 2 678 } 679 680 // Estimate buffer size 681 cmd.begin() 682 fieldCount := 1 683 predSize := 0 684 if len(policy.PredExp) > 0 { 685 predSize = cmd.estimatePredExpSize(policy.PredExp) 686 fieldCount++ 687 } 688 689 cmd.dataOffset += int(_FIELD_HEADER_SIZE) + 5 690 691 var prev *BatchRead 692 for i := 0; i < max; i++ { 693 record := records[offsets[i]] 694 key := record.Key 695 binNames := record.BinNames 696 697 cmd.dataOffset += len(key.digest) + 4 698 699 // Try reference equality in hope that namespace/set for all keys is set from fixed variables. 700 if prev != nil && prev.Key.namespace == key.namespace && 701 (!policy.SendSetName || prev.Key.setName == key.setName) && 702 &prev.BinNames == &binNames && prev.ReadAllBins == record.ReadAllBins { 703 // Can set repeat previous namespace/bin names to save space. 704 cmd.dataOffset++ 705 } else { 706 // Must write full header and namespace/set/bin names. 707 cmd.dataOffset += len(key.namespace) + int(_FIELD_HEADER_SIZE) + 6 708 709 if policy.SendSetName { 710 cmd.dataOffset += len(key.setName) + int(_FIELD_HEADER_SIZE) 711 } 712 713 if len(binNames) != 0 { 714 for _, binName := range binNames { 715 cmd.estimateOperationSizeForBinName(binName) 716 } 717 } 718 719 prev = record 720 } 721 } 722 723 if err := cmd.sizeBuffer(policy.compress()); err != nil { 724 return err 725 } 726 727 readAttr := _INFO1_READ 728 if policy.ReadModeAP == ReadModeAPAll { 729 readAttr |= _INFO1_READ_MODE_AP_ALL 730 } 731 732 cmd.writeHeader(&policy.BasePolicy, readAttr|_INFO1_BATCH, 0, fieldCount, 0) 733 // cmd.writeHeader(&policy.BasePolicy, _INFO1_READ|_INFO1_BATCH, 0, 1, 0) 734 735 if len(policy.PredExp) > 0 { 736 if err := cmd.writePredExp(policy.PredExp, predSize); err != nil { 737 return err 738 } 739 } 740 741 // Write real field size. 742 fieldSizeOffset := cmd.dataOffset 743 if policy.SendSetName { 744 cmd.writeFieldHeader(0, BATCH_INDEX_WITH_SET) 745 } else { 746 cmd.writeFieldHeader(0, BATCH_INDEX) 747 } 748 749 cmd.WriteUint32(uint32(max)) 750 751 if policy.AllowInline { 752 cmd.WriteByte(1) 753 } else { 754 cmd.WriteByte(0) 755 } 756 757 prev = nil 758 for i := 0; i < max; i++ { 759 index := offsets[i] 760 cmd.WriteUint32(uint32(index)) 761 762 record := records[index] 763 key := record.Key 764 binNames := record.BinNames 765 cmd.Write(key.digest[:]) 766 767 // Try reference equality in hope that namespace/set for all keys is set from fixed variables. 768 if prev != nil && prev.Key.namespace == key.namespace && 769 (!policy.SendSetName || prev.Key.setName == key.setName) && 770 &prev.BinNames == &binNames && prev.ReadAllBins == record.ReadAllBins { 771 // Can set repeat previous namespace/bin names to save space. 772 cmd.WriteByte(1) // repeat 773 } else { 774 // Write full header, namespace and bin names. 775 cmd.WriteByte(0) // do not repeat 776 if len(binNames) > 0 { 777 cmd.WriteByte(byte(readAttr)) 778 cmd.WriteUint16(uint16(fieldCountRow)) 779 cmd.WriteUint16(uint16(len(binNames))) 780 cmd.writeFieldString(key.namespace, NAMESPACE) 781 782 if policy.SendSetName { 783 cmd.writeFieldString(key.setName, TABLE) 784 } 785 786 for _, binName := range binNames { 787 cmd.writeOperationForBinName(binName, _READ) 788 } 789 } else { 790 attr := byte(readAttr) 791 if record.ReadAllBins { 792 attr |= byte(_INFO1_GET_ALL) 793 } else { 794 attr |= byte(_INFO1_NOBINDATA) 795 } 796 cmd.WriteByte(attr) 797 798 cmd.WriteUint16(uint16(fieldCountRow)) 799 cmd.WriteUint16(0) 800 cmd.writeFieldString(key.namespace, NAMESPACE) 801 802 if policy.SendSetName { 803 cmd.writeFieldString(key.setName, TABLE) 804 } 805 } 806 807 prev = record 808 } 809 } 810 811 cmd.WriteUint32At(uint32(cmd.dataOffset)-uint32(_MSG_TOTAL_HEADER_SIZE)-4, fieldSizeOffset) 812 cmd.end() 813 cmd.markCompressed(policy) 814 815 return nil 816} 817 818func (cmd *baseCommand) setScan(policy *ScanPolicy, namespace *string, setName *string, binNames []string, taskID uint64) error { 819 cmd.begin() 820 fieldCount := 0 821 822 predSize := 0 823 if len(policy.PredExp) > 0 { 824 predSize = cmd.estimatePredExpSize(policy.PredExp) 825 fieldCount++ 826 } 827 828 if namespace != nil { 829 cmd.dataOffset += len(*namespace) + int(_FIELD_HEADER_SIZE) 830 fieldCount++ 831 } 832 833 if setName != nil { 834 cmd.dataOffset += len(*setName) + int(_FIELD_HEADER_SIZE) 835 fieldCount++ 836 } 837 838 if policy.RecordsPerSecond > 0 { 839 cmd.dataOffset += 4 + int(_FIELD_HEADER_SIZE) 840 fieldCount++ 841 } 842 843 // Estimate scan options size. 844 cmd.dataOffset += 2 + int(_FIELD_HEADER_SIZE) 845 fieldCount++ 846 847 // Estimate scan timeout size. 848 cmd.dataOffset += 4 + int(_FIELD_HEADER_SIZE) 849 fieldCount++ 850 851 // Allocate space for TaskId field. 852 cmd.dataOffset += 8 + int(_FIELD_HEADER_SIZE) 853 fieldCount++ 854 855 if binNames != nil { 856 for i := range binNames { 857 cmd.estimateOperationSizeForBinName(binNames[i]) 858 } 859 } 860 861 if err := cmd.sizeBuffer(false); err != nil { 862 return err 863 } 864 readAttr := _INFO1_READ 865 866 if !policy.IncludeBinData { 867 readAttr |= _INFO1_NOBINDATA 868 } 869 870 operationCount := 0 871 if binNames != nil { 872 operationCount = len(binNames) 873 } 874 cmd.writeHeader(&policy.BasePolicy, readAttr, 0, fieldCount, operationCount) 875 876 if namespace != nil { 877 cmd.writeFieldString(*namespace, NAMESPACE) 878 } 879 880 if setName != nil { 881 cmd.writeFieldString(*setName, TABLE) 882 } 883 884 if len(policy.PredExp) > 0 { 885 if err := cmd.writePredExp(policy.PredExp, predSize); err != nil { 886 return err 887 } 888 } 889 890 if policy.RecordsPerSecond > 0 { 891 cmd.writeFieldInt32(int32(policy.RecordsPerSecond), RECORDS_PER_SECOND) 892 } 893 894 cmd.writeFieldHeader(2, SCAN_OPTIONS) 895 priority := byte(policy.Priority) 896 priority <<= 4 897 898 if policy.FailOnClusterChange { 899 priority |= 0x08 900 } 901 902 cmd.WriteByte(priority) 903 cmd.WriteByte(byte(policy.ScanPercent)) 904 905 // Write scan timeout 906 cmd.writeFieldHeader(4, SCAN_TIMEOUT) 907 cmd.WriteInt32(int32(policy.SocketTimeout / time.Millisecond)) // in milliseconds 908 909 cmd.writeFieldHeader(8, TRAN_ID) 910 cmd.WriteUint64(taskID) 911 912 if binNames != nil { 913 for i := range binNames { 914 cmd.writeOperationForBinName(binNames[i], _READ) 915 } 916 } 917 918 cmd.end() 919 920 return nil 921} 922 923func (cmd *baseCommand) setQuery(policy *QueryPolicy, wpolicy *WritePolicy, statement *Statement, operations []*Operation, write bool) (err error) { 924 fieldCount := 0 925 filterSize := 0 926 binNameSize := 0 927 predSize := 0 928 predExp := statement.predExps 929 930 recordsPerSecond := 0 931 if !write { 932 recordsPerSecond = policy.RecordsPerSecond 933 } 934 935 cmd.begin() 936 937 if statement.Namespace != "" { 938 cmd.dataOffset += len(statement.Namespace) + int(_FIELD_HEADER_SIZE) 939 fieldCount++ 940 } 941 942 if statement.IndexName != "" { 943 cmd.dataOffset += len(statement.IndexName) + int(_FIELD_HEADER_SIZE) 944 fieldCount++ 945 } 946 947 if statement.SetName != "" { 948 cmd.dataOffset += len(statement.SetName) + int(_FIELD_HEADER_SIZE) 949 fieldCount++ 950 } 951 952 // Allocate space for TaskId field. 953 cmd.dataOffset += 8 + int(_FIELD_HEADER_SIZE) 954 fieldCount++ 955 956 if statement.Filter != nil { 957 idxType := statement.Filter.IndexCollectionType() 958 959 if idxType != ICT_DEFAULT { 960 cmd.dataOffset += int(_FIELD_HEADER_SIZE) + 1 961 fieldCount++ 962 } 963 964 cmd.dataOffset += int(_FIELD_HEADER_SIZE) 965 filterSize++ // num filters 966 967 sz, err := statement.Filter.EstimateSize() 968 if err != nil { 969 return err 970 } 971 filterSize += sz 972 973 cmd.dataOffset += filterSize 974 fieldCount++ 975 976 // Query bin names are specified as a field (Scan bin names are specified later as operations) 977 if len(statement.BinNames) > 0 { 978 cmd.dataOffset += int(_FIELD_HEADER_SIZE) 979 binNameSize++ // num bin names 980 981 for _, binName := range statement.BinNames { 982 binNameSize += len(binName) + 1 983 } 984 cmd.dataOffset += binNameSize 985 fieldCount++ 986 } 987 } else { 988 // Calling query with no filters is more efficiently handled by a primary index scan. 989 // Estimate scan options size. 990 cmd.dataOffset += (2 + int(_FIELD_HEADER_SIZE)) 991 fieldCount++ 992 993 // Estimate scan timeout size. 994 cmd.dataOffset += (4 + int(_FIELD_HEADER_SIZE)) 995 fieldCount++ 996 997 // Estimate records per second size. 998 if recordsPerSecond > 0 { 999 cmd.dataOffset += 4 + int(_FIELD_HEADER_SIZE) 1000 fieldCount++ 1001 } 1002 } 1003 1004 if len(policy.PredExp) > 0 && len(predExp) == 0 { 1005 predExp = policy.PredExp 1006 } 1007 1008 if len(predExp) > 0 { 1009 predSize = cmd.estimatePredExpSize(predExp) 1010 fieldCount++ 1011 } 1012 1013 var functionArgs *ValueArray 1014 if statement.functionName != "" { 1015 cmd.dataOffset += int(_FIELD_HEADER_SIZE) + 1 // udf type 1016 cmd.dataOffset += len(statement.packageName) + int(_FIELD_HEADER_SIZE) 1017 cmd.dataOffset += len(statement.functionName) + int(_FIELD_HEADER_SIZE) 1018 1019 fasz := 0 1020 if len(statement.functionArgs) > 0 { 1021 functionArgs = NewValueArray(statement.functionArgs) 1022 fasz, err = functionArgs.EstimateSize() 1023 if err != nil { 1024 return err 1025 } 1026 } 1027 1028 cmd.dataOffset += int(_FIELD_HEADER_SIZE) + fasz 1029 fieldCount += 4 1030 } 1031 1032 // Operations (used in query execute) and bin names (used in scan/query) are mutually exclusive. 1033 if len(operations) > 0 { 1034 for _, op := range operations { 1035 cmd.estimateOperationSizeForOperation(op) 1036 } 1037 } else if len(statement.BinNames) > 0 && statement.Filter == nil { 1038 for _, binName := range statement.BinNames { 1039 cmd.estimateOperationSizeForBinName(binName) 1040 } 1041 } 1042 1043 if err := cmd.sizeBuffer(false); err != nil { 1044 return err 1045 } 1046 1047 operationCount := 0 1048 if len(operations) > 0 { 1049 operationCount = len(operations) 1050 } else if statement.Filter == nil && len(statement.BinNames) > 0 { 1051 operationCount = len(statement.BinNames) 1052 } 1053 1054 if write { 1055 cmd.writeHeaderWithPolicy(wpolicy, 0, _INFO2_WRITE, fieldCount, operationCount) 1056 } else { 1057 readAttr := _INFO1_READ | _INFO1_NOBINDATA 1058 if policy.IncludeBinData { 1059 readAttr = _INFO1_READ 1060 } 1061 cmd.writeHeader(&policy.BasePolicy, readAttr, 0, fieldCount, operationCount) 1062 } 1063 1064 if statement.Namespace != "" { 1065 cmd.writeFieldString(statement.Namespace, NAMESPACE) 1066 } 1067 1068 if statement.IndexName != "" { 1069 cmd.writeFieldString(statement.IndexName, INDEX_NAME) 1070 } 1071 1072 if statement.SetName != "" { 1073 cmd.writeFieldString(statement.SetName, TABLE) 1074 } 1075 1076 cmd.writeFieldHeader(8, TRAN_ID) 1077 cmd.WriteUint64(statement.TaskId) 1078 1079 if statement.Filter != nil { 1080 idxType := statement.Filter.IndexCollectionType() 1081 1082 if idxType != ICT_DEFAULT { 1083 cmd.writeFieldHeader(1, INDEX_TYPE) 1084 cmd.WriteByte(byte(idxType)) 1085 } 1086 1087 cmd.writeFieldHeader(filterSize, INDEX_RANGE) 1088 cmd.WriteByte(byte(1)) // number of filters 1089 1090 _, err := statement.Filter.write(cmd) 1091 if err != nil { 1092 return err 1093 } 1094 1095 if len(statement.BinNames) > 0 { 1096 cmd.writeFieldHeader(binNameSize, QUERY_BINLIST) 1097 cmd.WriteByte(byte(len(statement.BinNames))) 1098 1099 for _, binName := range statement.BinNames { 1100 len := copy(cmd.dataBuffer[cmd.dataOffset+1:], binName) 1101 cmd.dataBuffer[cmd.dataOffset] = byte(len) 1102 cmd.dataOffset += len + 1 1103 } 1104 } 1105 } else { 1106 // Calling query with no filters is more efficiently handled by a primary index scan. 1107 cmd.writeFieldHeader(2, SCAN_OPTIONS) 1108 priority := byte(policy.Priority) 1109 priority <<= 4 1110 1111 if !write && policy.FailOnClusterChange { 1112 priority |= 0x08 1113 } 1114 1115 cmd.WriteByte(priority) 1116 cmd.WriteByte(byte(100)) 1117 1118 // Write scan timeout 1119 cmd.writeFieldHeader(4, SCAN_TIMEOUT) 1120 cmd.WriteInt32(int32(policy.SocketTimeout / time.Millisecond)) // in milliseconds 1121 1122 // Write records per second. 1123 if recordsPerSecond > 0 { 1124 cmd.writeFieldInt32(int32(recordsPerSecond), RECORDS_PER_SECOND) 1125 } 1126 } 1127 1128 if len(predExp) > 0 { 1129 if err := cmd.writePredExp(predExp, predSize); err != nil { 1130 return err 1131 } 1132 } 1133 1134 if statement.functionName != "" { 1135 cmd.writeFieldHeader(1, UDF_OP) 1136 if statement.returnData { 1137 cmd.dataBuffer[cmd.dataOffset] = byte(1) 1138 } else { 1139 cmd.dataBuffer[cmd.dataOffset] = byte(2) 1140 } 1141 cmd.dataOffset++ 1142 1143 cmd.writeFieldString(statement.packageName, UDF_PACKAGE_NAME) 1144 cmd.writeFieldString(statement.functionName, UDF_FUNCTION) 1145 cmd.writeUdfArgs(functionArgs) 1146 } 1147 1148 if len(operations) > 0 { 1149 for _, op := range operations { 1150 cmd.writeOperationForOperation(op) 1151 } 1152 } else if len(statement.BinNames) > 0 && statement.Filter == nil { 1153 // scan binNames come last 1154 for _, binName := range statement.BinNames { 1155 cmd.writeOperationForBinName(binName, _READ) 1156 } 1157 } 1158 1159 cmd.end() 1160 1161 return nil 1162} 1163 1164func (cmd *baseCommand) estimateKeySize(key *Key, sendKey bool) (int, error) { 1165 fieldCount := 0 1166 1167 if key.namespace != "" { 1168 cmd.dataOffset += len(key.namespace) + int(_FIELD_HEADER_SIZE) 1169 fieldCount++ 1170 } 1171 1172 if key.setName != "" { 1173 cmd.dataOffset += len(key.setName) + int(_FIELD_HEADER_SIZE) 1174 fieldCount++ 1175 } 1176 1177 cmd.dataOffset += int(_DIGEST_SIZE + _FIELD_HEADER_SIZE) 1178 fieldCount++ 1179 1180 if sendKey { 1181 // field header size + key size 1182 sz, err := key.userKey.EstimateSize() 1183 if err != nil { 1184 return sz, err 1185 } 1186 cmd.dataOffset += sz + int(_FIELD_HEADER_SIZE) + 1 1187 fieldCount++ 1188 } 1189 1190 return fieldCount, nil 1191} 1192 1193func (cmd *baseCommand) estimateUdfSize(packageName string, functionName string, args *ValueArray) (int, error) { 1194 cmd.dataOffset += len(packageName) + int(_FIELD_HEADER_SIZE) 1195 cmd.dataOffset += len(functionName) + int(_FIELD_HEADER_SIZE) 1196 1197 sz, err := args.EstimateSize() 1198 if err != nil { 1199 return 0, err 1200 } 1201 1202 // fmt.Println(args, sz) 1203 1204 cmd.dataOffset += sz + int(_FIELD_HEADER_SIZE) 1205 return 3, nil 1206} 1207 1208func (cmd *baseCommand) estimateOperationSizeForBin(bin *Bin) error { 1209 cmd.dataOffset += len(bin.Name) + int(_OPERATION_HEADER_SIZE) 1210 sz, err := bin.Value.EstimateSize() 1211 if err != nil { 1212 return err 1213 } 1214 cmd.dataOffset += sz 1215 return nil 1216} 1217 1218func (cmd *baseCommand) estimateOperationSizeForBinNameAndValue(name string, value interface{}) error { 1219 cmd.dataOffset += len(name) + int(_OPERATION_HEADER_SIZE) 1220 sz, err := NewValue(value).EstimateSize() 1221 if err != nil { 1222 return err 1223 } 1224 cmd.dataOffset += sz 1225 return nil 1226} 1227 1228func (cmd *baseCommand) estimateOperationSizeForOperation(operation *Operation) error { 1229 binLen := len(operation.binName) 1230 cmd.dataOffset += binLen + int(_OPERATION_HEADER_SIZE) 1231 1232 if operation.encoder == nil { 1233 if operation.binValue != nil { 1234 sz, err := operation.binValue.EstimateSize() 1235 if err != nil { 1236 return err 1237 } 1238 cmd.dataOffset += sz 1239 } 1240 } else { 1241 sz, err := operation.encoder(operation, nil) 1242 if err != nil { 1243 return err 1244 } 1245 cmd.dataOffset += sz 1246 } 1247 return nil 1248} 1249 1250func (cmd *baseCommand) estimateOperationSizeForBinName(binName string) { 1251 cmd.dataOffset += len(binName) + int(_OPERATION_HEADER_SIZE) 1252} 1253 1254func (cmd *baseCommand) estimateOperationSize() { 1255 cmd.dataOffset += int(_OPERATION_HEADER_SIZE) 1256} 1257 1258func (cmd *baseCommand) estimatePredExpSize(predExp []PredExp) int { 1259 sz := 0 1260 for _, predexp := range predExp { 1261 sz += predexp.marshaledSize() 1262 } 1263 cmd.dataOffset += sz + int(_FIELD_HEADER_SIZE) 1264 return sz 1265} 1266 1267// Generic header write. 1268func (cmd *baseCommand) writeHeader(policy *BasePolicy, readAttr int, writeAttr int, fieldCount int, operationCount int) { 1269 infoAttr := 0 1270 1271 switch policy.ReadModeSC { 1272 case ReadModeSCSession: 1273 case ReadModeSCLinearize: 1274 infoAttr |= _INFO3_SC_READ_TYPE 1275 case ReadModeSCAllowReplica: 1276 infoAttr |= _INFO3_SC_READ_RELAX 1277 case ReadModeSCAllowUnavailable: 1278 infoAttr |= _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX 1279 } 1280 1281 if policy.ReadModeAP == ReadModeAPAll { 1282 readAttr |= _INFO1_READ_MODE_AP_ALL 1283 } 1284 1285 if policy.UseCompression { 1286 readAttr |= _INFO1_COMPRESS_RESPONSE 1287 } 1288 1289 // Write all header data except total size which must be written last. 1290 cmd.dataBuffer[8] = _MSG_REMAINING_HEADER_SIZE // Message header length. 1291 cmd.dataBuffer[9] = byte(readAttr) 1292 cmd.dataBuffer[10] = byte(writeAttr) 1293 cmd.dataBuffer[11] = byte(infoAttr) 1294 1295 for i := 12; i < 26; i++ { 1296 cmd.dataBuffer[i] = 0 1297 } 1298 cmd.dataOffset = 26 1299 cmd.WriteInt16(int16(fieldCount)) 1300 cmd.WriteInt16(int16(operationCount)) 1301 cmd.dataOffset = int(_MSG_TOTAL_HEADER_SIZE) 1302} 1303 1304// Header write for write operations. 1305func (cmd *baseCommand) writeHeaderWithPolicy(policy *WritePolicy, readAttr int, writeAttr int, fieldCount int, operationCount int) { 1306 // Set flags. 1307 generation := uint32(0) 1308 infoAttr := 0 1309 1310 switch policy.RecordExistsAction { 1311 case UPDATE: 1312 case UPDATE_ONLY: 1313 infoAttr |= _INFO3_UPDATE_ONLY 1314 case REPLACE: 1315 infoAttr |= _INFO3_CREATE_OR_REPLACE 1316 case REPLACE_ONLY: 1317 infoAttr |= _INFO3_REPLACE_ONLY 1318 case CREATE_ONLY: 1319 writeAttr |= _INFO2_CREATE_ONLY 1320 } 1321 1322 switch policy.GenerationPolicy { 1323 case NONE: 1324 case EXPECT_GEN_EQUAL: 1325 generation = policy.Generation 1326 writeAttr |= _INFO2_GENERATION 1327 case EXPECT_GEN_GT: 1328 generation = policy.Generation 1329 writeAttr |= _INFO2_GENERATION_GT 1330 } 1331 1332 if policy.CommitLevel == COMMIT_MASTER { 1333 infoAttr |= _INFO3_COMMIT_MASTER 1334 } 1335 1336 if policy.DurableDelete { 1337 writeAttr |= _INFO2_DURABLE_DELETE 1338 } 1339 1340 switch policy.ReadModeSC { 1341 case ReadModeSCSession: 1342 case ReadModeSCLinearize: 1343 infoAttr |= _INFO3_SC_READ_TYPE 1344 case ReadModeSCAllowReplica: 1345 infoAttr |= _INFO3_SC_READ_RELAX 1346 case ReadModeSCAllowUnavailable: 1347 infoAttr |= _INFO3_SC_READ_TYPE | _INFO3_SC_READ_RELAX 1348 } 1349 1350 if policy.ReadModeAP == ReadModeAPAll { 1351 readAttr |= _INFO1_READ_MODE_AP_ALL 1352 } 1353 1354 if policy.UseCompression { 1355 readAttr |= _INFO1_COMPRESS_RESPONSE 1356 } 1357 1358 // Write all header data except total size which must be written last. 1359 cmd.dataBuffer[8] = _MSG_REMAINING_HEADER_SIZE // Message header length. 1360 cmd.dataBuffer[9] = byte(readAttr) 1361 cmd.dataBuffer[10] = byte(writeAttr) 1362 cmd.dataBuffer[11] = byte(infoAttr) 1363 cmd.dataBuffer[12] = 0 // unused 1364 cmd.dataBuffer[13] = 0 // clear the result code 1365 cmd.dataOffset = 14 1366 cmd.WriteUint32(generation) 1367 cmd.dataOffset = 18 1368 cmd.WriteUint32(policy.Expiration) 1369 1370 // Initialize timeout. It will be written later. 1371 cmd.dataBuffer[22] = 0 1372 cmd.dataBuffer[23] = 0 1373 cmd.dataBuffer[24] = 0 1374 cmd.dataBuffer[25] = 0 1375 1376 cmd.dataOffset = 26 1377 cmd.WriteInt16(int16(fieldCount)) 1378 cmd.WriteInt16(int16(operationCount)) 1379 cmd.dataOffset = int(_MSG_TOTAL_HEADER_SIZE) 1380} 1381 1382func (cmd *baseCommand) writeKey(key *Key, sendKey bool) { 1383 // Write key into buffer. 1384 if key.namespace != "" { 1385 cmd.writeFieldString(key.namespace, NAMESPACE) 1386 } 1387 1388 if key.setName != "" { 1389 cmd.writeFieldString(key.setName, TABLE) 1390 } 1391 1392 cmd.writeFieldBytes(key.digest[:], DIGEST_RIPE) 1393 1394 if sendKey { 1395 cmd.writeFieldValue(key.userKey, KEY) 1396 } 1397} 1398 1399func (cmd *baseCommand) writeOperationForBin(bin *Bin, operation OperationType) error { 1400 nameLength := copy(cmd.dataBuffer[(cmd.dataOffset+int(_OPERATION_HEADER_SIZE)):], bin.Name) 1401 1402 valueLength, err := bin.Value.EstimateSize() 1403 if err != nil { 1404 return err 1405 } 1406 1407 cmd.WriteInt32(int32(nameLength + valueLength + 4)) 1408 cmd.WriteByte((operation.op)) 1409 cmd.WriteByte((byte(bin.Value.GetType()))) 1410 cmd.WriteByte((byte(0))) 1411 cmd.WriteByte((byte(nameLength))) 1412 cmd.dataOffset += nameLength 1413 _, err = bin.Value.write(cmd) 1414 return err 1415} 1416 1417func (cmd *baseCommand) writeOperationForBinNameAndValue(name string, val interface{}, operation OperationType) error { 1418 nameLength := copy(cmd.dataBuffer[(cmd.dataOffset+int(_OPERATION_HEADER_SIZE)):], name) 1419 1420 v := NewValue(val) 1421 1422 valueLength, err := v.EstimateSize() 1423 if err != nil { 1424 return err 1425 } 1426 1427 cmd.WriteInt32(int32(nameLength + valueLength + 4)) 1428 cmd.WriteByte((operation.op)) 1429 cmd.WriteByte((byte(v.GetType()))) 1430 cmd.WriteByte((byte(0))) 1431 cmd.WriteByte((byte(nameLength))) 1432 cmd.dataOffset += nameLength 1433 _, err = v.write(cmd) 1434 return err 1435} 1436 1437func (cmd *baseCommand) writeOperationForOperation(operation *Operation) error { 1438 nameLength := copy(cmd.dataBuffer[(cmd.dataOffset+int(_OPERATION_HEADER_SIZE)):], operation.binName) 1439 1440 if operation.used { 1441 // cahce will set the used flag to false again 1442 operation.cache() 1443 } 1444 1445 if operation.encoder == nil { 1446 valueLength, err := operation.binValue.EstimateSize() 1447 if err != nil { 1448 return err 1449 } 1450 1451 cmd.WriteInt32(int32(nameLength + valueLength + 4)) 1452 cmd.WriteByte((operation.opType.op)) 1453 cmd.WriteByte((byte(operation.binValue.GetType()))) 1454 cmd.WriteByte((byte(0))) 1455 cmd.WriteByte((byte(nameLength))) 1456 cmd.dataOffset += nameLength 1457 _, err = operation.binValue.write(cmd) 1458 return err 1459 } 1460 1461 valueLength, err := operation.encoder(operation, nil) 1462 if err != nil { 1463 return err 1464 } 1465 1466 cmd.WriteInt32(int32(nameLength + valueLength + 4)) 1467 cmd.WriteByte((operation.opType.op)) 1468 cmd.WriteByte((byte(ParticleType.BLOB))) 1469 cmd.WriteByte((byte(0))) 1470 cmd.WriteByte((byte(nameLength))) 1471 cmd.dataOffset += nameLength 1472 _, err = operation.encoder(operation, cmd) 1473 //mark the operation as used, so that it will be cached the next time it is used 1474 operation.used = err == nil 1475 return err 1476} 1477 1478func (cmd *baseCommand) writeOperationForBinName(name string, operation OperationType) { 1479 nameLength := copy(cmd.dataBuffer[(cmd.dataOffset+int(_OPERATION_HEADER_SIZE)):], name) 1480 cmd.WriteInt32(int32(nameLength + 4)) 1481 cmd.WriteByte((operation.op)) 1482 cmd.WriteByte(byte(0)) 1483 cmd.WriteByte(byte(0)) 1484 cmd.WriteByte(byte(nameLength)) 1485 cmd.dataOffset += nameLength 1486} 1487 1488func (cmd *baseCommand) writeOperationForOperationType(operation OperationType) { 1489 cmd.WriteInt32(int32(4)) 1490 cmd.WriteByte(operation.op) 1491 cmd.WriteByte(0) 1492 cmd.WriteByte(0) 1493 cmd.WriteByte(0) 1494} 1495 1496func (cmd *baseCommand) writePredExp(predExp []PredExp, predSize int) error { 1497 cmd.writeFieldHeader(predSize, PREDEXP) 1498 for i := range predExp { 1499 if err := predExp[i].marshal(cmd); err != nil { 1500 return err 1501 } 1502 } 1503 return nil 1504} 1505 1506func (cmd *baseCommand) writeFieldValue(value Value, ftype FieldType) error { 1507 vlen, err := value.EstimateSize() 1508 if err != nil { 1509 return err 1510 } 1511 cmd.writeFieldHeader(vlen+1, ftype) 1512 cmd.WriteByte(byte(value.GetType())) 1513 1514 _, err = value.write(cmd) 1515 return err 1516} 1517 1518func (cmd *baseCommand) writeUdfArgs(value *ValueArray) error { 1519 if value != nil { 1520 vlen, err := value.EstimateSize() 1521 if err != nil { 1522 return err 1523 } 1524 cmd.writeFieldHeader(vlen, UDF_ARGLIST) 1525 _, err = value.pack(cmd) 1526 return err 1527 } 1528 1529 cmd.writeFieldHeader(0, UDF_ARGLIST) 1530 return nil 1531 1532} 1533 1534func (cmd *baseCommand) writeFieldInt32(val int32, ftype FieldType) { 1535 cmd.writeFieldHeader(4, ftype) 1536 cmd.WriteInt32(val) 1537} 1538 1539func (cmd *baseCommand) writeFieldInt64(val int64, ftype FieldType) { 1540 cmd.writeFieldHeader(8, ftype) 1541 cmd.WriteInt64(val) 1542} 1543 1544func (cmd *baseCommand) writeFieldString(str string, ftype FieldType) { 1545 len := copy(cmd.dataBuffer[(cmd.dataOffset+int(_FIELD_HEADER_SIZE)):], str) 1546 cmd.writeFieldHeader(len, ftype) 1547 cmd.dataOffset += len 1548} 1549 1550func (cmd *baseCommand) writeFieldBytes(bytes []byte, ftype FieldType) { 1551 copy(cmd.dataBuffer[cmd.dataOffset+int(_FIELD_HEADER_SIZE):], bytes) 1552 1553 cmd.writeFieldHeader(len(bytes), ftype) 1554 cmd.dataOffset += len(bytes) 1555} 1556 1557func (cmd *baseCommand) writeFieldHeader(size int, ftype FieldType) { 1558 cmd.WriteInt32(int32(size + 1)) 1559 cmd.WriteByte((byte(ftype))) 1560} 1561 1562// Int64ToBytes converts an int64 into slice of Bytes. 1563func (cmd *baseCommand) WriteInt64(num int64) (int, error) { 1564 return cmd.WriteUint64(uint64(num)) 1565} 1566 1567// Uint64ToBytes converts an uint64 into slice of Bytes. 1568func (cmd *baseCommand) WriteUint64(num uint64) (int, error) { 1569 binary.BigEndian.PutUint64(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+8], num) 1570 cmd.dataOffset += 8 1571 return 8, nil 1572} 1573 1574// Int32ToBytes converts an int32 to a byte slice of size 4 1575func (cmd *baseCommand) WriteInt32(num int32) (int, error) { 1576 return cmd.WriteUint32(uint32(num)) 1577} 1578 1579// Uint32ToBytes converts an uint32 to a byte slice of size 4 1580func (cmd *baseCommand) WriteUint32(num uint32) (int, error) { 1581 binary.BigEndian.PutUint32(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+4], num) 1582 cmd.dataOffset += 4 1583 return 4, nil 1584} 1585 1586// Uint32ToBytes converts an uint32 to a byte slice of size 4 1587func (cmd *baseCommand) WriteUint32At(num uint32, index int) (int, error) { 1588 binary.BigEndian.PutUint32(cmd.dataBuffer[index:index+4], num) 1589 return 4, nil 1590} 1591 1592// Int16ToBytes converts an int16 to slice of bytes 1593func (cmd *baseCommand) WriteInt16(num int16) (int, error) { 1594 return cmd.WriteUint16(uint16(num)) 1595} 1596 1597// Int16ToBytes converts an int16 to slice of bytes 1598func (cmd *baseCommand) WriteUint16(num uint16) (int, error) { 1599 binary.BigEndian.PutUint16(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+2], num) 1600 cmd.dataOffset += 2 1601 return 2, nil 1602} 1603 1604func (cmd *baseCommand) WriteFloat32(float float32) (int, error) { 1605 bits := math.Float32bits(float) 1606 binary.BigEndian.PutUint32(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+4], bits) 1607 cmd.dataOffset += 4 1608 return 4, nil 1609} 1610 1611func (cmd *baseCommand) WriteFloat64(float float64) (int, error) { 1612 bits := math.Float64bits(float) 1613 binary.BigEndian.PutUint64(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+8], bits) 1614 cmd.dataOffset += 8 1615 return 8, nil 1616} 1617 1618func (cmd *baseCommand) WriteByte(b byte) error { 1619 cmd.dataBuffer[cmd.dataOffset] = b 1620 cmd.dataOffset++ 1621 return nil 1622} 1623 1624func (cmd *baseCommand) WriteString(s string) (int, error) { 1625 copy(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+len(s)], s) 1626 cmd.dataOffset += len(s) 1627 return len(s), nil 1628} 1629 1630func (cmd *baseCommand) Write(b []byte) (int, error) { 1631 copy(cmd.dataBuffer[cmd.dataOffset:cmd.dataOffset+len(b)], b) 1632 cmd.dataOffset += len(b) 1633 return len(b), nil 1634} 1635 1636func (cmd *baseCommand) begin() { 1637 cmd.dataOffset = int(_MSG_TOTAL_HEADER_SIZE) 1638} 1639 1640func (cmd *baseCommand) sizeBuffer(compress bool) error { 1641 return cmd.sizeBufferSz(cmd.dataOffset, compress) 1642} 1643 1644func (cmd *baseCommand) validateHeader(header int64) error { 1645 msgVersion := (uint64(header) & 0xFF00000000000000) >> 56 1646 if msgVersion != 2 { 1647 return NewAerospikeError(PARSE_ERROR, fmt.Sprintf("Invalid Message Header: Expected version to be 2, but got %v", msgVersion)) 1648 } 1649 1650 msgType := (uint64(header) & 0x00FF000000000000) >> 49 1651 if !(msgType == 1 || msgType == 3 || msgType == 4) { 1652 return NewAerospikeError(PARSE_ERROR, fmt.Sprintf("Invalid Message Header: Expected type to be 1, 3 or 4, but got %v", msgType)) 1653 } 1654 1655 msgSize := header & 0x0000FFFFFFFFFFFF 1656 if msgSize > int64(MaxBufferSize) { 1657 return NewAerospikeError(PARSE_ERROR, fmt.Sprintf("Invalid Message Header: Expected size to be under 10MiB, but got %v", msgSize)) 1658 } 1659 1660 return nil 1661} 1662 1663var ( 1664 // MaxBufferSize protects against allocating massive memory blocks 1665 // for buffers. Tweak this number if you are returning a lot of 1666 // LDT elements in your queries. 1667 MaxBufferSize = 1024 * 1024 * 120 // 120 MB 1668) 1669 1670const ( 1671 msgHeaderPad = 16 1672 zlibHeaderPad = 2 1673) 1674 1675func (cmd *baseCommand) sizeBufferSz(size int, willCompress bool) error { 1676 1677 if willCompress { 1678 // adds zlib and proto pads to the size of the buffer 1679 size += msgHeaderPad + zlibHeaderPad 1680 } 1681 1682 // Corrupted data streams can result in a huge length. 1683 // Do a sanity check here. 1684 if size > MaxBufferSize || size < 0 { 1685 return NewAerospikeError(PARSE_ERROR, fmt.Sprintf("Invalid size for buffer: %d", size)) 1686 } 1687 1688 if size <= len(cmd.dataBuffer) { 1689 // don't touch the buffer 1690 } else if size <= cap(cmd.dataBuffer) { 1691 cmd.dataBuffer = cmd.dataBuffer[:size] 1692 } else { 1693 // not enough space 1694 cmd.dataBuffer = make([]byte, size) 1695 } 1696 1697 // The trick here to keep a ref to the buffer, and set the buffer itself 1698 // to a padded version of the original: 1699 // | Proto Header | Original Compressed Size | compressed message | 1700 // | 8 Bytes | 8 Bytes | | 1701 if willCompress { 1702 cmd.dataBufferCompress = cmd.dataBuffer 1703 cmd.dataBuffer = cmd.dataBufferCompress[msgHeaderPad+zlibHeaderPad:] 1704 } 1705 1706 return nil 1707} 1708 1709func (cmd *baseCommand) end() { 1710 var proto = int64(cmd.dataOffset-8) | (_CL_MSG_VERSION << 56) | (_AS_MSG_TYPE << 48) 1711 binary.BigEndian.PutUint64(cmd.dataBuffer[0:], uint64(proto)) 1712} 1713 1714func (cmd *baseCommand) markCompressed(policy Policy) { 1715 cmd.compressed = policy.compress() 1716} 1717 1718func (cmd *baseCommand) compress() error { 1719 if cmd.compressed && cmd.dataOffset > _COMPRESS_THRESHOLD { 1720 b := bytes.NewBuffer(cmd.dataBufferCompress[msgHeaderPad:]) 1721 b.Reset() 1722 w := zlib.NewWriter(b) 1723 1724 // There seems to be a bug either in Go's zlib or in zlibc 1725 // which messes up a single write block of bigger than 64KB to 1726 // the deflater. 1727 // Things work in multiple writes of 64KB though, so this is 1728 // how we're going to do it. 1729 i := 0 1730 const step = 64 * 1024 1731 for i+step < cmd.dataOffset { 1732 n, err := w.Write(cmd.dataBuffer[i : i+step]) 1733 i += n 1734 if err != nil { 1735 return err 1736 } 1737 } 1738 1739 if i < cmd.dataOffset { 1740 _, err := w.Write(cmd.dataBuffer[i:cmd.dataOffset]) 1741 if err != nil { 1742 return err 1743 } 1744 } 1745 1746 // flush 1747 w.Close() 1748 1749 compressedSz := b.Len() 1750 1751 // Use compressed buffer if compression completed within original buffer size. 1752 var proto = int64(compressedSz+8) | (_CL_MSG_VERSION << 56) | (_AS_MSG_TYPE_COMPRESSED << 48) 1753 binary.BigEndian.PutUint64(cmd.dataBufferCompress[0:], uint64(proto)) 1754 binary.BigEndian.PutUint64(cmd.dataBufferCompress[8:], uint64(cmd.dataOffset)) 1755 1756 cmd.dataBuffer = cmd.dataBufferCompress 1757 cmd.dataOffset = compressedSz + 16 1758 cmd.dataBufferCompress = nil 1759 } 1760 1761 return nil 1762} 1763 1764// isCompressed returns the length of the compressed buffer. 1765// If the buffer is not compressed, the result will be -1 1766func (cmd *baseCommand) compressedSize() int { 1767 proto := Buffer.BytesToInt64(cmd.dataBuffer, 0) 1768 size := proto & 0xFFFFFFFFFFFF 1769 1770 msgType := (proto >> 48) & 0xff 1771 1772 if msgType != _AS_MSG_TYPE_COMPRESSED { 1773 return -1 1774 } 1775 1776 return int(size) 1777} 1778 1779//////////////////////////////////// 1780 1781func setInDoubt(err error, isRead bool, commandSentCounter int) error { 1782 // set inDoubt flag 1783 if ae, ok := err.(AerospikeError); ok { 1784 ae.SetInDoubt(isRead, commandSentCounter) 1785 return ae 1786 } 1787 1788 return err 1789} 1790 1791func (cmd *baseCommand) execute(ifc command, isRead bool) error { 1792 policy := ifc.getPolicy(ifc).GetBasePolicy() 1793 deadline := policy.deadline() 1794 1795 return cmd.executeAt(ifc, policy, isRead, deadline, -1, 0) 1796} 1797 1798func (cmd *baseCommand) executeAt(ifc command, policy *BasePolicy, isRead bool, deadline time.Time, iterations, commandSentCounter int) (err error) { 1799 // for exponential backoff 1800 interval := policy.SleepBetweenRetries 1801 1802 shouldSleep := false 1803 isClientTimeout := false 1804 1805 // Execute command until successful, timed out or maximum iterations have been reached. 1806 for { 1807 iterations++ 1808 1809 // too many retries 1810 if (policy.MaxRetries <= 0 && iterations > 0) || (policy.MaxRetries > 0 && iterations > policy.MaxRetries) { 1811 if ae, ok := err.(AerospikeError); ok { 1812 err = NewAerospikeError(ae.ResultCode(), fmt.Sprintf("command execution timed out on client: Exceeded number of retries. See `Policy.MaxRetries`. (last error: %s)", err.Error())) 1813 } 1814 1815 return setInDoubt(err, isRead, commandSentCounter) 1816 } 1817 1818 // Sleep before trying again, after the first iteration 1819 if policy.SleepBetweenRetries > 0 && shouldSleep { 1820 // Do not sleep if you know you'll wake up after the deadline 1821 if policy.TotalTimeout > 0 && time.Now().Add(interval).After(deadline) { 1822 break 1823 } 1824 1825 time.Sleep(interval) 1826 if policy.SleepMultiplier > 1 { 1827 interval = time.Duration(float64(interval) * policy.SleepMultiplier) 1828 } 1829 } 1830 1831 if shouldSleep { 1832 aerr, ok := err.(AerospikeError) 1833 if !ifc.prepareRetry(ifc, isClientTimeout || (ok && aerr.ResultCode() != SERVER_NOT_AVAILABLE)) { 1834 if bc, ok := ifc.(batcher); ok { 1835 // Batch may be retried in separate commands. 1836 if retry, err := bc.retryBatch(bc, cmd.node.cluster, deadline, iterations, commandSentCounter); retry { 1837 // Batch was retried in separate commands. Complete this command. 1838 return err 1839 } 1840 } 1841 } 1842 } 1843 1844 // NOTE: This is important to be after the prepareRetry block above 1845 isClientTimeout = false 1846 1847 shouldSleep = true 1848 1849 // check for command timeout 1850 if policy.TotalTimeout > 0 && time.Now().After(deadline) { 1851 break 1852 } 1853 1854 // set command node, so when you return a record it has the node 1855 cmd.node, err = ifc.getNode(ifc) 1856 if cmd.node == nil || !cmd.node.IsActive() || err != nil { 1857 isClientTimeout = true 1858 1859 // Node is currently inactive. Retry. 1860 continue 1861 } 1862 1863 cmd.conn, err = ifc.getConnection(policy) 1864 if err != nil { 1865 isClientTimeout = true 1866 1867 if err == ErrConnectionPoolEmpty { 1868 // if the connection pool is empty, we still haven't tried 1869 // the transaction to increase the iteration count. 1870 iterations-- 1871 } 1872 Logger.Debug("Node " + cmd.node.String() + ": " + err.Error()) 1873 continue 1874 } 1875 1876 // Assign the connection buffer to the command buffer 1877 cmd.dataBuffer = cmd.conn.dataBuffer 1878 1879 // Set command buffer. 1880 err = ifc.writeBuffer(ifc) 1881 if err != nil { 1882 // All runtime exceptions are considered fatal. Do not retry. 1883 // Close socket to flush out possible garbage. Do not put back in pool. 1884 cmd.conn.Close() 1885 cmd.conn = nil 1886 return err 1887 } 1888 1889 // Reset timeout in send buffer (destined for server) and socket. 1890 binary.BigEndian.PutUint32(cmd.dataBuffer[22:], 0) 1891 if !deadline.IsZero() { 1892 serverTimeout := deadline.Sub(time.Now()) 1893 if serverTimeout < time.Millisecond { 1894 serverTimeout = time.Millisecond 1895 } 1896 binary.BigEndian.PutUint32(cmd.dataBuffer[22:], uint32(serverTimeout/time.Millisecond)) 1897 } 1898 1899 // now that the deadline has been set in the buffer, compress the contents 1900 if err = cmd.compress(); err != nil { 1901 return NewAerospikeError(SERIALIZE_ERROR, err.Error()) 1902 } 1903 1904 // Send command. 1905 _, err = cmd.conn.Write(cmd.dataBuffer[:cmd.dataOffset]) 1906 if err != nil { 1907 isClientTimeout = true 1908 1909 // IO errors are considered temporary anomalies. Retry. 1910 // Close socket to flush out possible garbage. Do not put back in pool. 1911 cmd.conn.Close() 1912 cmd.conn = nil 1913 1914 Logger.Debug("Node " + cmd.node.String() + ": " + err.Error()) 1915 continue 1916 } 1917 commandSentCounter++ 1918 1919 // Parse results. 1920 err = ifc.parseResult(ifc, cmd.conn) 1921 if err != nil { 1922 if _, ok := err.(net.Error); err == ErrTimeout || err == io.EOF || ok { 1923 isClientTimeout = true 1924 if err != ErrTimeout { 1925 if aerr, ok := err.(AerospikeError); ok && aerr.ResultCode() == TIMEOUT { 1926 isClientTimeout = false 1927 } 1928 } 1929 1930 // IO errors are considered temporary anomalies. Retry. 1931 // Close socket to flush out possible garbage. Do not put back in pool. 1932 cmd.conn.Close() 1933 1934 Logger.Debug("Node " + cmd.node.String() + ": " + err.Error()) 1935 1936 // retry only for non-streaming commands 1937 if !cmd.oneShot { 1938 cmd.conn = nil 1939 continue 1940 } 1941 } 1942 1943 // close the connection 1944 // cancelling/closing the batch/multi commands will return an error, which will 1945 // close the connection to throw away its data and signal the server about the 1946 // situation. We will not put back the connection in the buffer. 1947 if cmd.conn.IsConnected() && KeepConnection(err) { 1948 // Put connection back in pool. 1949 cmd.node.PutConnection(cmd.conn) 1950 } else { 1951 cmd.conn.Close() 1952 cmd.conn = nil 1953 } 1954 1955 return setInDoubt(err, isRead, commandSentCounter) 1956 } 1957 1958 // in case it has grown and re-allocated 1959 if len(cmd.dataBufferCompress) > len(cmd.dataBuffer) { 1960 cmd.conn.dataBuffer = cmd.dataBufferCompress 1961 } else { 1962 cmd.conn.dataBuffer = cmd.dataBuffer 1963 } 1964 1965 // Put connection back in pool. 1966 // cmd.node.PutConnection(cmd.conn) 1967 ifc.putConnection(cmd.conn) 1968 1969 // command has completed successfully. Exit method. 1970 return nil 1971 1972 } 1973 1974 // execution timeout 1975 return ErrTimeout 1976} 1977 1978func (cmd *baseCommand) parseRecordResults(ifc command, receiveSize int) (bool, error) { 1979 panic("Abstract method. Should not end up here") 1980} 1981