1// Copyright 2013-2020 Aerospike, Inc. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package aerospike 16 17import ( 18 . "github.com/aerospike/aerospike-client-go/types" 19) 20 21type batchNode struct { 22 Node *Node 23 offsets []int 24} 25 26func newBatchNodeList(cluster *Cluster, policy *BatchPolicy, keys []*Key) ([]*batchNode, error) { 27 nodes := cluster.GetNodes() 28 29 if len(nodes) == 0 { 30 return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "command failed because cluster is empty.") 31 } 32 33 // Create initial key capacity for each node as average + 25%. 34 keysPerNode := len(keys) / len(nodes) 35 keysPerNode += keysPerNode / 2 36 37 // The minimum key capacity is 10. 38 if keysPerNode < 10 { 39 keysPerNode = 10 40 } 41 42 replicaPolicy := policy.ReplicaPolicy 43 replicaPolicySC := GetReplicaPolicySC(policy.GetBasePolicy()) 44 45 // Split keys by server node. 46 batchNodes := make([]*batchNode, 0, len(nodes)) 47 48 for i := range keys { 49 node, err := GetNodeBatchRead(cluster, keys[i], replicaPolicy, replicaPolicySC, 0, 0) 50 if err != nil { 51 return nil, err 52 } 53 54 if batchNode := findBatchNode(batchNodes, node); batchNode == nil { 55 batchNodes = append(batchNodes, newBatchNode(node, keysPerNode, i)) 56 } else { 57 batchNode.AddKey(i) 58 } 59 } 60 return batchNodes, nil 61} 62 63func newBatchNodeListKeys(cluster *Cluster, policy *BatchPolicy, keys []*Key, sequenceAP, sequenceSC int, batchSeed *batchNode) ([]*batchNode, error) { 64 nodes := cluster.GetNodes() 65 66 if len(nodes) == 0 { 67 return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "command failed because cluster is empty.") 68 } 69 70 // Create initial key capacity for each node as average + 25%. 71 keysPerNode := len(keys) / len(nodes) 72 keysPerNode += keysPerNode / 2 73 74 // The minimum key capacity is 10. 75 if keysPerNode < 10 { 76 keysPerNode = 10 77 } 78 79 replicaPolicy := policy.ReplicaPolicy 80 replicaPolicySC := GetReplicaPolicySC(policy.GetBasePolicy()) 81 82 // Split keys by server node. 83 batchNodes := make([]*batchNode, 0, len(nodes)) 84 85 for _, offset := range batchSeed.offsets { 86 node, err := GetNodeBatchRead(cluster, keys[offset], replicaPolicy, replicaPolicySC, sequenceAP, sequenceSC) 87 if err != nil { 88 return nil, err 89 } 90 91 if batchNode := findBatchNode(batchNodes, node); batchNode == nil { 92 batchNodes = append(batchNodes, newBatchNode(node, keysPerNode, offset)) 93 } else { 94 batchNode.AddKey(offset) 95 } 96 } 97 return batchNodes, nil 98} 99 100func newBatchNodeListRecords(cluster *Cluster, policy *BatchPolicy, records []*BatchRead, sequenceAP, sequenceSC int, batchSeed *batchNode) ([]*batchNode, error) { 101 nodes := cluster.GetNodes() 102 103 if len(nodes) == 0 { 104 return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "command failed because cluster is empty.") 105 } 106 107 // Create initial key capacity for each node as average + 25%. 108 keysPerNode := len(batchSeed.offsets) / len(nodes) 109 keysPerNode += keysPerNode / 2 110 111 // The minimum key capacity is 10. 112 if keysPerNode < 10 { 113 keysPerNode = 10 114 } 115 116 replicaPolicy := policy.ReplicaPolicy 117 replicaPolicySC := GetReplicaPolicySC(policy.GetBasePolicy()) 118 119 // Split keys by server node. 120 batchNodes := make([]*batchNode, 0, len(nodes)) 121 122 for _, offset := range batchSeed.offsets { 123 node, err := GetNodeBatchRead(cluster, records[offset].Key, replicaPolicy, replicaPolicySC, sequenceAP, sequenceSC) 124 if err != nil { 125 return nil, err 126 } 127 128 if batchNode := findBatchNode(batchNodes, node); batchNode == nil { 129 batchNodes = append(batchNodes, newBatchNode(node, keysPerNode, offset)) 130 } else { 131 batchNode.AddKey(offset) 132 } 133 } 134 return batchNodes, nil 135} 136 137func newBatchIndexNodeList(cluster *Cluster, policy *BatchPolicy, records []*BatchRead) ([]*batchNode, error) { 138 nodes := cluster.GetNodes() 139 140 if len(nodes) == 0 { 141 return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "command failed because cluster is empty.") 142 } 143 144 // Create initial key capacity for each node as average + 25%. 145 keysPerNode := len(records) / len(nodes) 146 keysPerNode += keysPerNode / 2 147 148 // The minimum key capacity is 10. 149 if keysPerNode < 10 { 150 keysPerNode = 10 151 } 152 153 replicaPolicy := policy.ReplicaPolicy 154 replicaPolicySC := GetReplicaPolicySC(policy.GetBasePolicy()) 155 156 // Split keys by server node. 157 batchNodes := make([]*batchNode, 0, len(nodes)) 158 159 for i := range records { 160 node, err := GetNodeBatchRead(cluster, records[i].Key, replicaPolicy, replicaPolicySC, 0, 0) 161 if err != nil { 162 return nil, err 163 } 164 165 if batchNode := findBatchNode(batchNodes, node); batchNode == nil { 166 batchNodes = append(batchNodes, newBatchNode(node, keysPerNode, i)) 167 } else { 168 batchNode.AddKey(i) 169 } 170 } 171 return batchNodes, nil 172} 173 174func newBatchNode(node *Node, capacity int, offset int) *batchNode { 175 res := &batchNode{ 176 Node: node, 177 offsets: make([]int, 1, capacity), 178 } 179 180 res.offsets[0] = offset 181 return res 182} 183 184func (bn *batchNode) AddKey(offset int) { 185 bn.offsets = append(bn.offsets, offset) 186} 187 188func findBatchNode(nodes []*batchNode, node *Node) *batchNode { 189 for i := range nodes { 190 // Note: using pointer equality for performance. 191 if nodes[i].Node == node { 192 return nodes[i] 193 } 194 } 195 return nil 196} 197