1// Copyright 2013-2020 Aerospike, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package aerospike
16
17import (
18	"fmt"
19	"reflect"
20	"time"
21
22	. "github.com/aerospike/aerospike-client-go/types"
23	xrand "github.com/aerospike/aerospike-client-go/types/rand"
24	Buffer "github.com/aerospike/aerospike-client-go/utils/buffer"
25)
26
27type baseMultiCommand struct {
28	baseCommand
29
30	namespace  string
31	clusterKey int64
32	first      bool
33
34	terminationError ResultCode
35
36	recordset *Recordset
37
38	terminationErrorType ResultCode
39
40	errChan chan error
41
42	socketTimeout time.Duration
43
44	resObjType     reflect.Type
45	resObjMappings map[string][]int
46	selectCases    []reflect.SelectCase
47
48	bc bufferedConn
49}
50
51var multiObjectParser func(
52	cmd *baseMultiCommand,
53	obj reflect.Value,
54	opCount int,
55	fieldCount int,
56	generation uint32,
57	expiration uint32,
58) error
59
60var prepareReflectionData func(cmd *baseMultiCommand)
61
62func newMultiCommand(node *Node, recordset *Recordset) *baseMultiCommand {
63	cmd := &baseMultiCommand{
64		baseCommand: baseCommand{
65			node:    node,
66			oneShot: true,
67		},
68		recordset: recordset,
69	}
70
71	if prepareReflectionData != nil {
72		prepareReflectionData(cmd)
73	}
74	return cmd
75}
76
77func newCorrectMultiCommand(node *Node, recordset *Recordset, namespace string, clusterKey int64, first bool) *baseMultiCommand {
78	cmd := &baseMultiCommand{
79		baseCommand: baseCommand{
80			node:    node,
81			oneShot: true,
82		},
83		namespace:  namespace,
84		clusterKey: clusterKey,
85		first:      first,
86		recordset:  recordset,
87	}
88
89	if prepareReflectionData != nil {
90		prepareReflectionData(cmd)
91	}
92	return cmd
93}
94
95func (cmd *baseMultiCommand) getNode(ifc command) (*Node, error) {
96	return cmd.node, nil
97}
98
99func (cmd *baseMultiCommand) prepareRetry(ifc command, isTimeout bool) bool {
100	return false
101}
102
103func (cmd *baseMultiCommand) getConnection(policy Policy) (*Connection, error) {
104	return cmd.node.getConnectionWithHint(policy.GetBasePolicy().deadline(), policy.GetBasePolicy().socketTimeout(), byte(xrand.Int64()%256))
105}
106
107func (cmd *baseMultiCommand) putConnection(conn *Connection) {
108	cmd.node.putConnectionWithHint(conn, byte(xrand.Int64()%256))
109}
110
111func (cmd *baseMultiCommand) parseResult(ifc command, conn *Connection) error {
112	// Read socket into receive buffer one record at a time.  Do not read entire receive size
113	// because the receive buffer would be too big.
114	status := true
115
116	var err error
117
118	cmd.bc = newBufferedConn(conn, 0)
119	for status {
120		if err := cmd.conn.initInflater(false, 0); err != nil {
121			return NewAerospikeError(PARSE_ERROR, "Error setting up zlib inflater:", err.Error())
122		}
123		cmd.bc.reset(8)
124
125		// Read header.
126		if cmd.dataBuffer, err = cmd.bc.read(8); err != nil {
127			return err
128		}
129
130		proto := Buffer.BytesToInt64(cmd.dataBuffer, 0)
131		receiveSize := int(proto & 0xFFFFFFFFFFFF)
132		if receiveSize <= 0 {
133			continue
134		}
135
136		if compressedSize := cmd.compressedSize(); compressedSize > 0 {
137			cmd.bc.reset(8)
138			// Read header.
139			if cmd.dataBuffer, err = cmd.bc.read(8); err != nil {
140				return err
141			}
142
143			receiveSize = int(Buffer.BytesToInt64(cmd.dataBuffer, 0)) - 8
144			if err := cmd.conn.initInflater(true, compressedSize-8); err != nil {
145				return NewAerospikeError(PARSE_ERROR, fmt.Sprintf("Error setting up zlib inflater for size `%d`: %s", compressedSize-8, err.Error()))
146			}
147
148			// read the first 8 bytes
149			cmd.bc.reset(8)
150			if cmd.dataBuffer, err = cmd.bc.read(8); err != nil {
151				return err
152			}
153		}
154
155		// Validate header to make sure we are at the beginning of a message
156		proto = Buffer.BytesToInt64(cmd.dataBuffer, 0)
157		if err := cmd.validateHeader(proto); err != nil {
158			return err
159		}
160
161		if receiveSize > 0 {
162			cmd.bc.reset(receiveSize)
163
164			status, err = ifc.parseRecordResults(ifc, receiveSize)
165			if err != nil {
166				cmd.bc.drainConn()
167				return err
168			}
169		} else {
170			status = false
171		}
172	}
173
174	// if the buffer has been resized, put it back so that it will be reassigned to the connection.
175	cmd.dataBuffer = cmd.bc.buf()
176
177	return nil
178}
179
180func (cmd *baseMultiCommand) parseKey(fieldCount int) (*Key, error) {
181	var digest [20]byte
182	var namespace, setName string
183	var userKey Value
184	var err error
185
186	for i := 0; i < fieldCount; i++ {
187		if err = cmd.readBytes(4); err != nil {
188			return nil, err
189		}
190
191		fieldlen := int(Buffer.BytesToUint32(cmd.dataBuffer, 0))
192		if err = cmd.readBytes(fieldlen); err != nil {
193			return nil, err
194		}
195
196		fieldtype := FieldType(cmd.dataBuffer[0])
197		size := fieldlen - 1
198
199		switch fieldtype {
200		case DIGEST_RIPE:
201			copy(digest[:], cmd.dataBuffer[1:size+1])
202		case NAMESPACE:
203			namespace = string(cmd.dataBuffer[1 : size+1])
204		case TABLE:
205			setName = string(cmd.dataBuffer[1 : size+1])
206		case KEY:
207			if userKey, err = bytesToKeyValue(int(cmd.dataBuffer[1]), cmd.dataBuffer, 2, size-1); err != nil {
208				return nil, err
209			}
210		}
211	}
212
213	return &Key{namespace: namespace, setName: setName, digest: digest, userKey: userKey}, nil
214}
215
216func (cmd *baseMultiCommand) readBytes(length int) (err error) {
217	// Corrupted data streams can result in a huge length.
218	// Do a sanity check here.
219	if length > MaxBufferSize || length < 0 {
220		return NewAerospikeError(PARSE_ERROR, fmt.Sprintf("Invalid readBytes length: %d", length))
221	}
222
223	cmd.dataBuffer, err = cmd.bc.read(length)
224	if err != nil {
225		return err
226	}
227	cmd.dataOffset += length
228
229	return nil
230}
231
232func (cmd *baseMultiCommand) parseRecordResults(ifc command, receiveSize int) (bool, error) {
233	// Read/parse remaining message bytes one record at a time.
234	cmd.dataOffset = 0
235
236	for cmd.dataOffset < receiveSize {
237		if err := cmd.readBytes(int(_MSG_REMAINING_HEADER_SIZE)); err != nil {
238			err = newNodeError(cmd.node, err)
239			return false, err
240		}
241		resultCode := ResultCode(cmd.dataBuffer[5] & 0xFF)
242
243		if resultCode != 0 {
244			if resultCode == KEY_NOT_FOUND_ERROR || resultCode == FILTERED_OUT {
245				return false, nil
246			}
247			err := NewAerospikeError(resultCode)
248			err = newNodeError(cmd.node, err)
249			return false, err
250		}
251
252		info3 := int(cmd.dataBuffer[3])
253
254		// If cmd is the end marker of the response, do not proceed further
255		if (info3 & _INFO3_LAST) == _INFO3_LAST {
256			return false, nil
257		}
258
259		generation := Buffer.BytesToUint32(cmd.dataBuffer, 6)
260		expiration := TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10))
261		fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
262		opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))
263
264		key, err := cmd.parseKey(fieldCount)
265		if err != nil {
266			err = newNodeError(cmd.node, err)
267			return false, err
268		}
269
270		// if there is a recordset, process the record traditionally
271		// otherwise, it is supposed to be a record channel
272		if cmd.selectCases == nil {
273			// Parse bins.
274			var bins BinMap
275
276			for i := 0; i < opCount; i++ {
277				if err := cmd.readBytes(8); err != nil {
278					err = newNodeError(cmd.node, err)
279					return false, err
280				}
281
282				opSize := int(Buffer.BytesToUint32(cmd.dataBuffer, 0))
283				particleType := int(cmd.dataBuffer[5])
284				nameSize := int(cmd.dataBuffer[7])
285
286				if err := cmd.readBytes(nameSize); err != nil {
287					err = newNodeError(cmd.node, err)
288					return false, err
289				}
290				name := string(cmd.dataBuffer[:nameSize])
291
292				particleBytesSize := opSize - (4 + nameSize)
293				if err = cmd.readBytes(particleBytesSize); err != nil {
294					err = newNodeError(cmd.node, err)
295					return false, err
296				}
297				value, err := bytesToParticle(particleType, cmd.dataBuffer, 0, particleBytesSize)
298				if err != nil {
299					err = newNodeError(cmd.node, err)
300					return false, err
301				}
302
303				if bins == nil {
304					bins = make(BinMap, opCount)
305				}
306				bins[name] = value
307			}
308
309			// If the channel is full and it blocks, we don't want this command to
310			// block forever, or panic in case the channel is closed in the meantime.
311			select {
312			// send back the result on the async channel
313			case cmd.recordset.Records <- newRecord(cmd.node, key, bins, generation, expiration):
314			case <-cmd.recordset.cancelled:
315				return false, NewAerospikeError(cmd.terminationErrorType)
316			}
317		} else if multiObjectParser != nil {
318			obj := reflect.New(cmd.resObjType)
319			if err := multiObjectParser(cmd, obj, opCount, fieldCount, generation, expiration); err != nil {
320				err = newNodeError(cmd.node, err)
321				return false, err
322			}
323
324			// set the object to send
325			cmd.selectCases[0].Send = obj
326
327			chosen, _, _ := reflect.Select(cmd.selectCases)
328			switch chosen {
329			case 0: // object sent
330			case 1: // cancel channel is closed
331				return false, NewAerospikeError(cmd.terminationErrorType)
332			}
333		}
334	}
335
336	return true, nil
337}
338
339func (cmd *baseMultiCommand) execute(ifc command, isRead bool) error {
340
341	/***************************************************************************
342	IMPORTANT: 	No need to send the error here to the recordset.Error channel.
343				It is being sent from the downstream command from the result
344				returned from the function.
345	****************************************************************************/
346
347	if cmd.clusterKey != 0 {
348		if !cmd.first {
349			if err := queryValidate(cmd.node, cmd.namespace, cmd.clusterKey); err != nil {
350				return err
351			}
352		}
353
354		if err := cmd.baseCommand.execute(ifc, isRead); err != nil {
355			return err
356		}
357
358		return queryValidate(cmd.node, cmd.namespace, cmd.clusterKey)
359	}
360
361	return cmd.baseCommand.execute(ifc, isRead)
362}
363