1// Copyright 2013-2020 Aerospike, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package aerospike
16
17import (
18	. "github.com/aerospike/aerospike-client-go/types"
19)
20
21type batchNode struct {
22	Node    *Node
23	offsets []int
24}
25
26func newBatchNodeList(cluster *Cluster, policy *BatchPolicy, keys []*Key) ([]*batchNode, error) {
27	nodes := cluster.GetNodes()
28
29	if len(nodes) == 0 {
30		return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "command failed because cluster is empty.")
31	}
32
33	// Create initial key capacity for each node as average + 25%.
34	keysPerNode := len(keys) / len(nodes)
35	keysPerNode += keysPerNode / 2
36
37	// The minimum key capacity is 10.
38	if keysPerNode < 10 {
39		keysPerNode = 10
40	}
41
42	replicaPolicy := policy.ReplicaPolicy
43	replicaPolicySC := GetReplicaPolicySC(policy.GetBasePolicy())
44
45	// Split keys by server node.
46	batchNodes := make([]*batchNode, 0, len(nodes))
47
48	for i := range keys {
49		node, err := GetNodeBatchRead(cluster, keys[i], replicaPolicy, replicaPolicySC, 0, 0)
50		if err != nil {
51			return nil, err
52		}
53
54		if batchNode := findBatchNode(batchNodes, node); batchNode == nil {
55			batchNodes = append(batchNodes, newBatchNode(node, keysPerNode, i))
56		} else {
57			batchNode.AddKey(i)
58		}
59	}
60	return batchNodes, nil
61}
62
63func newBatchNodeListKeys(cluster *Cluster, policy *BatchPolicy, keys []*Key, sequenceAP, sequenceSC int, batchSeed *batchNode) ([]*batchNode, error) {
64	nodes := cluster.GetNodes()
65
66	if len(nodes) == 0 {
67		return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "command failed because cluster is empty.")
68	}
69
70	// Create initial key capacity for each node as average + 25%.
71	keysPerNode := len(keys) / len(nodes)
72	keysPerNode += keysPerNode / 2
73
74	// The minimum key capacity is 10.
75	if keysPerNode < 10 {
76		keysPerNode = 10
77	}
78
79	replicaPolicy := policy.ReplicaPolicy
80	replicaPolicySC := GetReplicaPolicySC(policy.GetBasePolicy())
81
82	// Split keys by server node.
83	batchNodes := make([]*batchNode, 0, len(nodes))
84
85	for _, offset := range batchSeed.offsets {
86		node, err := GetNodeBatchRead(cluster, keys[offset], replicaPolicy, replicaPolicySC, sequenceAP, sequenceSC)
87		if err != nil {
88			return nil, err
89		}
90
91		if batchNode := findBatchNode(batchNodes, node); batchNode == nil {
92			batchNodes = append(batchNodes, newBatchNode(node, keysPerNode, offset))
93		} else {
94			batchNode.AddKey(offset)
95		}
96	}
97	return batchNodes, nil
98}
99
100func newBatchNodeListRecords(cluster *Cluster, policy *BatchPolicy, records []*BatchRead, sequenceAP, sequenceSC int, batchSeed *batchNode) ([]*batchNode, error) {
101	nodes := cluster.GetNodes()
102
103	if len(nodes) == 0 {
104		return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "command failed because cluster is empty.")
105	}
106
107	// Create initial key capacity for each node as average + 25%.
108	keysPerNode := len(batchSeed.offsets) / len(nodes)
109	keysPerNode += keysPerNode / 2
110
111	// The minimum key capacity is 10.
112	if keysPerNode < 10 {
113		keysPerNode = 10
114	}
115
116	replicaPolicy := policy.ReplicaPolicy
117	replicaPolicySC := GetReplicaPolicySC(policy.GetBasePolicy())
118
119	// Split keys by server node.
120	batchNodes := make([]*batchNode, 0, len(nodes))
121
122	for _, offset := range batchSeed.offsets {
123		node, err := GetNodeBatchRead(cluster, records[offset].Key, replicaPolicy, replicaPolicySC, sequenceAP, sequenceSC)
124		if err != nil {
125			return nil, err
126		}
127
128		if batchNode := findBatchNode(batchNodes, node); batchNode == nil {
129			batchNodes = append(batchNodes, newBatchNode(node, keysPerNode, offset))
130		} else {
131			batchNode.AddKey(offset)
132		}
133	}
134	return batchNodes, nil
135}
136
137func newBatchIndexNodeList(cluster *Cluster, policy *BatchPolicy, records []*BatchRead) ([]*batchNode, error) {
138	nodes := cluster.GetNodes()
139
140	if len(nodes) == 0 {
141		return nil, NewAerospikeError(SERVER_NOT_AVAILABLE, "command failed because cluster is empty.")
142	}
143
144	// Create initial key capacity for each node as average + 25%.
145	keysPerNode := len(records) / len(nodes)
146	keysPerNode += keysPerNode / 2
147
148	// The minimum key capacity is 10.
149	if keysPerNode < 10 {
150		keysPerNode = 10
151	}
152
153	replicaPolicy := policy.ReplicaPolicy
154	replicaPolicySC := GetReplicaPolicySC(policy.GetBasePolicy())
155
156	// Split keys by server node.
157	batchNodes := make([]*batchNode, 0, len(nodes))
158
159	for i := range records {
160		node, err := GetNodeBatchRead(cluster, records[i].Key, replicaPolicy, replicaPolicySC, 0, 0)
161		if err != nil {
162			return nil, err
163		}
164
165		if batchNode := findBatchNode(batchNodes, node); batchNode == nil {
166			batchNodes = append(batchNodes, newBatchNode(node, keysPerNode, i))
167		} else {
168			batchNode.AddKey(i)
169		}
170	}
171	return batchNodes, nil
172}
173
174func newBatchNode(node *Node, capacity int, offset int) *batchNode {
175	res := &batchNode{
176		Node:    node,
177		offsets: make([]int, 1, capacity),
178	}
179
180	res.offsets[0] = offset
181	return res
182}
183
184func (bn *batchNode) AddKey(offset int) {
185	bn.offsets = append(bn.offsets, offset)
186}
187
188func findBatchNode(nodes []*batchNode, node *Node) *batchNode {
189	for i := range nodes {
190		// Note: using pointer equality for performance.
191		if nodes[i].Node == node {
192			return nodes[i]
193		}
194	}
195	return nil
196}
197