1package tablestore
2
3import (
4	"bytes"
5	"encoding/binary"
6	"fmt"
7	"io"
8	"math"
9)
10
11const (
12	HEADER = 0x75
13
14	// tag type
15	TAG_ROW_PK             = 0x1
16	TAG_ROW_DATA           = 0x2
17	TAG_CELL               = 0x3
18	TAG_CELL_NAME          = 0x4
19	TAG_CELL_VALUE         = 0x5
20	TAG_CELL_TYPE          = 0x6
21	TAG_CELL_TIMESTAMP     = 0x7
22	TAG_DELETE_ROW_MARKER  = 0x8
23	TAG_ROW_CHECKSUM       = 0x9
24	TAG_CELL_CHECKSUM      = 0x0A
25	TAG_EXTENSION          = 0x0B
26	TAG_SEQ_INFO           = 0x0C
27	TAG_SEQ_INFO_EPOCH     = 0x0D
28	TAG_SEQ_INFO_TS        = 0x0E
29	TAG_SEQ_INFO_ROW_INDEX = 0x0F
30
31	// cell op type
32	DELETE_ALL_VERSION = 0x1
33	DELETE_ONE_VERSION = 0x3
34	INCREMENT = 0x4;
35
36	// variant type
37	VT_INTEGER = 0x0
38	VT_DOUBLE  = 0x1
39	VT_BOOLEAN = 0x2
40	VT_STRING  = 0x3
41
42	//public final static byte VT_NULL = 0x6;
43	VT_BLOB           = 0x7
44	VT_INF_MIN        = 0x9
45	VT_INF_MAX        = 0xa
46	VT_AUTO_INCREMENT = 0xb
47
48	LITTLE_ENDIAN_32_SIZE = 4
49	LITTLE_ENDIAN_64_SIZE = 8
50)
51
52const spaceSize = 256
53
54var crc8Table = make([]byte, spaceSize)
55
56func init() {
57	for i := 0; i < spaceSize; i++ {
58		x := byte(i)
59		for j := 8; j > 0; j-- {
60			if (x & 0x80) != 0 {
61				x = (x << 1) ^ 0x07
62			} else {
63				x = (x << 1) ^ 0
64			}
65		}
66		crc8Table[i] = x
67	}
68}
69
70func crc8Byte(crc, in byte) byte {
71	return crc8Table[(crc^in)&0xff]
72}
73
74func crc8Int32(crc byte, in int32) byte {
75	for i := 0; i < 4; i++ {
76		crc = crc8Byte(crc, byte((in & 0xff)))
77		in >>= 8
78	}
79
80	return crc
81}
82
83func crc8Int64(crc byte, in int64) byte {
84	for i := 0; i < 8; i++ {
85		crc = crc8Byte(crc, byte((in & 0xff)))
86		in >>= 8
87	}
88
89	return crc
90}
91
92func crc8Bytes(crc byte, in []byte) byte {
93	for i := 0; i < len(in); i++ {
94		crc = crc8Byte(crc, in[i])
95	}
96
97	return crc
98}
99
100func writeRawByte(w io.Writer, value byte) {
101	w.Write([]byte{value})
102}
103
104/*func writeRawByteInt8(w io.Writer, value int) {
105	w.Write([]byte{byte(value)})
106}*/
107
108func writeRawLittleEndian32(w io.Writer, value int32) {
109	w.Write([]byte{byte((value) & 0xFF)})
110	w.Write([]byte{byte((value >> 8) & 0xFF)})
111	w.Write([]byte{byte((value >> 16) & 0xFF)})
112	w.Write([]byte{byte((value >> 24) & 0xFF)})
113}
114
115func writeRawLittleEndian64(w io.Writer, value int64) {
116	w.Write([]byte{byte((value) & 0xFF)})
117	w.Write([]byte{byte((value >> 8) & 0xFF)})
118	w.Write([]byte{byte((value >> 16) & 0xFF)})
119	w.Write([]byte{byte((value >> 24) & 0xFF)})
120	w.Write([]byte{byte((value >> 32) & 0xFF)})
121	w.Write([]byte{byte((value >> 40) & 0xFF)})
122	w.Write([]byte{byte((value >> 48) & 0xFF)})
123	w.Write([]byte{byte((value >> 56) & 0xFF)})
124}
125
126func writeDouble(w io.Writer, value float64) {
127	writeRawLittleEndian64(w, int64(math.Float64bits(value)))
128}
129
130func writeBoolean(w io.Writer, value bool) {
131	if value {
132		w.Write([]byte{byte(1)})
133	} else {
134		w.Write([]byte{byte(0)})
135	}
136}
137
138func writeBytes(w io.Writer, value []byte) {
139	w.Write(value)
140}
141
142func writeHeader(w io.Writer) {
143	writeRawLittleEndian32(w, HEADER)
144}
145
146func writeTag(w io.Writer, tag byte) {
147	writeRawByte(w, tag)
148}
149
150func writeCellName(w io.Writer, name []byte) {
151	writeTag(w, TAG_CELL_NAME)
152	writeRawLittleEndian32(w, int32(len(name)))
153	writeBytes(w, name)
154}
155
156type PlainBufferCell struct {
157	cellName         []byte
158	cellValue        *ColumnValue
159	cellTimestamp    int64
160	cellType         byte
161	ignoreValue      bool
162	hasCellTimestamp bool
163	hasCellType      bool
164}
165
166func (cell *PlainBufferCell) writeCell(w io.Writer) {
167	writeTag(w, TAG_CELL)
168	writeCellName(w, cell.cellName)
169	if cell.ignoreValue == false {
170		cell.cellValue.writeCellValue(w)
171	}
172
173	if cell.hasCellType {
174		writeTag(w, TAG_CELL_TYPE)
175		writeRawByte(w, cell.cellType)
176	}
177
178	if cell.hasCellTimestamp {
179		writeTag(w, TAG_CELL_TIMESTAMP)
180		writeRawLittleEndian64(w, cell.cellTimestamp)
181	}
182
183	writeTag(w, TAG_CELL_CHECKSUM)
184	writeRawByte(w, cell.getCheckSum(byte(0x0)))
185}
186
187func (cell *PlainBufferCell) getCheckSum(crc byte) byte {
188	crc = crc8Bytes(crc, cell.cellName)
189	if cell.ignoreValue == false {
190		crc = cell.cellValue.getCheckSum(crc)
191	}
192
193	if cell.hasCellTimestamp {
194		crc = crc8Int64(crc, cell.cellTimestamp)
195	}
196	if cell.hasCellType {
197		crc = crc8Byte(crc, cell.cellType)
198	}
199	return crc
200}
201
202type PlainBufferRow struct {
203	primaryKey      []*PlainBufferCell
204	cells           []*PlainBufferCell
205	hasDeleteMarker bool
206	extension       *RecordSequenceInfo // optional
207}
208
209func (row *PlainBufferRow) writeRow(w io.Writer) {
210	/* pk */
211	writeTag(w, TAG_ROW_PK)
212	for _, pk := range row.primaryKey {
213		pk.writeCell(w)
214	}
215
216	if len(row.cells) > 0 {
217		writeTag(w, TAG_ROW_DATA)
218		for _, cell := range row.cells {
219			cell.writeCell(w)
220		}
221	}
222
223	writeTag(w, TAG_ROW_CHECKSUM)
224	writeRawByte(w, row.getCheckSum(byte(0x0)))
225}
226
227func (row *PlainBufferRow) writeRowWithHeader(w io.Writer) {
228	writeHeader(w)
229	row.writeRow(w)
230}
231
232func (row *PlainBufferRow) getCheckSum(crc byte) byte {
233	for _, cell := range row.primaryKey {
234		crcCell := cell.getCheckSum(byte(0x0))
235		crc = crc8Byte(crc, crcCell)
236	}
237
238	for _, cell := range row.cells {
239		crcCell := cell.getCheckSum(byte(0x0))
240		crc = crc8Byte(crc, crcCell)
241	}
242
243	del := byte(0x0)
244	if row.hasDeleteMarker {
245		del = byte(0x1)
246	}
247
248	crc = crc8Byte(crc, del)
249
250	return crc
251}
252
253func readRawByte(r *bytes.Reader) byte {
254	if r.Len() == 0 {
255		panic(errUnexpectIoEnd)
256	}
257
258	b, _ := r.ReadByte()
259
260	return b
261}
262
263func readTag(r *bytes.Reader) int {
264	return int(readRawByte(r))
265}
266
267func readRawLittleEndian64(r *bytes.Reader) int64 {
268	if r.Len() < 8 {
269		panic(errUnexpectIoEnd)
270	}
271
272	var v int64
273	binary.Read(r, binary.LittleEndian, &v)
274
275	return v
276}
277
278func readRawLittleEndian32(r *bytes.Reader) int32 {
279	if r.Len() < 4 {
280		panic(errUnexpectIoEnd)
281	}
282
283	var v int32
284	binary.Read(r, binary.LittleEndian, &v)
285
286	return v
287}
288
289func readBoolean(r *bytes.Reader) bool {
290	return readRawByte(r) != 0
291}
292
293func readBytes(r *bytes.Reader, size int32) []byte {
294	if int32(r.Len()) < size {
295		panic(errUnexpectIoEnd)
296	}
297	v := make([]byte, size)
298	r.Read(v)
299	return v
300}
301
302func readCellValue(r *bytes.Reader) *ColumnValue {
303	value := new(ColumnValue)
304	readRawLittleEndian32(r)
305	tp := readRawByte(r)
306	switch tp {
307	case VT_INTEGER:
308		value.Type = ColumnType_INTEGER
309		value.Value = readRawLittleEndian64(r)
310	case VT_DOUBLE:
311		value.Type = ColumnType_DOUBLE
312		value.Value = math.Float64frombits(uint64(readRawLittleEndian64(r)))
313	case VT_BOOLEAN:
314		value.Type = ColumnType_BOOLEAN
315		value.Value = readBoolean(r)
316	case VT_STRING:
317		value.Type = ColumnType_STRING
318		value.Value = string(readBytes(r, readRawLittleEndian32(r)))
319	case VT_BLOB:
320		value.Type = ColumnType_BINARY
321		value.Value = []byte(readBytes(r, readRawLittleEndian32(r)))
322	}
323	return value
324}
325
326func readCell(r *bytes.Reader) *PlainBufferCell {
327	cell := new(PlainBufferCell)
328	tag := readTag(r)
329	if tag != TAG_CELL_NAME {
330		panic(errTag)
331	}
332
333	cell.cellName = readBytes(r, readRawLittleEndian32(r))
334	tag = readTag(r)
335
336	if tag == TAG_CELL_VALUE {
337		cell.cellValue = readCellValue(r)
338		tag = readTag(r)
339	}
340	if tag == TAG_CELL_TYPE {
341		readRawByte(r)
342		tag = readTag(r)
343	}
344
345	if tag == TAG_CELL_TIMESTAMP {
346		cell.cellTimestamp = readRawLittleEndian64(r)
347		tag = readTag(r)
348	}
349
350	if tag == TAG_CELL_CHECKSUM {
351		readRawByte(r)
352	} else {
353		panic(errNoChecksum)
354	}
355
356	return cell
357}
358
359func readRowPk(r *bytes.Reader) []*PlainBufferCell {
360	primaryKeyColumns := make([]*PlainBufferCell, 0, 4)
361
362	tag := readTag(r)
363	for tag == TAG_CELL {
364		primaryKeyColumns = append(primaryKeyColumns, readCell(r))
365		tag = readTag(r)
366	}
367
368	r.Seek(-1, 1)
369
370	return primaryKeyColumns
371}
372
373func readRowData(r *bytes.Reader) []*PlainBufferCell {
374	columns := make([]*PlainBufferCell, 0, 10)
375
376	tag := readTag(r)
377	for tag == TAG_CELL {
378		columns = append(columns, readCell(r))
379		tag = readTag(r)
380	}
381
382	r.Seek(-1, 1)
383
384	return columns
385}
386
387func readRow(r *bytes.Reader) *PlainBufferRow {
388	row := new(PlainBufferRow)
389	tag := readTag(r)
390	if tag == TAG_ROW_PK {
391		row.primaryKey = readRowPk(r)
392		tag = readTag(r)
393	}
394
395	if tag == TAG_ROW_DATA {
396		row.cells = readRowData(r)
397		tag = readTag(r)
398	}
399
400	if tag == TAG_DELETE_ROW_MARKER {
401		row.hasDeleteMarker = true
402		tag = readTag(r)
403	}
404
405	if tag == TAG_EXTENSION {
406		row.extension = readRowExtension(r)
407		tag = readTag(r)
408	}
409
410	if tag == TAG_ROW_CHECKSUM {
411		readRawByte(r)
412	} else {
413		panic(errNoChecksum)
414	}
415	return row
416}
417
418func readRowsWithHeader(r *bytes.Reader) (rows []*PlainBufferRow, err error) {
419	defer func() {
420		if err2 := recover(); err2 != nil {
421			if _, ok := err2.(error); ok {
422				err = err2.(error)
423			}
424			return
425		}
426	}()
427
428	// TODO: panic
429	if readRawLittleEndian32(r) != HEADER {
430		return nil, fmt.Errorf("Invalid header from plain buffer")
431	}
432
433	rows = make([]*PlainBufferRow, 0, 10)
434
435	for r.Len() > 0 {
436		rows = append(rows, readRow(r))
437	}
438
439	return rows, nil
440}
441
442func readRowExtension(r *bytes.Reader) *RecordSequenceInfo {
443	readRawLittleEndian32(r) // useless
444	tag := readTag(r)
445	if tag != TAG_SEQ_INFO {
446		panic(errTag)
447	}
448
449	readRawLittleEndian32(r) // useless
450	tag = readTag(r)
451	if tag != TAG_SEQ_INFO_EPOCH {
452		panic(errTag)
453	}
454	epoch := readRawLittleEndian32(r)
455
456	tag = readTag(r)
457	if tag != TAG_SEQ_INFO_TS {
458		panic(errTag)
459	}
460	ts := readRawLittleEndian64(r)
461
462	tag = readTag(r)
463	if tag != TAG_SEQ_INFO_ROW_INDEX {
464		panic(errTag)
465	}
466	rowIndex := readRawLittleEndian32(r)
467
468	ext := RecordSequenceInfo{}
469	ext.Epoch = epoch
470	ext.Timestamp = ts
471	ext.RowIndex = rowIndex
472	return &ext
473}
474