1package protocol
2
3import (
4	"bytes"
5	"encoding/binary"
6	"fmt"
7	"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore"
8	"io"
9	"math"
10)
11
12const (
13	HEADER = 0x75
14
15	// tag type
16	TAG_ROW_PK             = 0x1
17	TAG_ROW_DATA           = 0x2
18	TAG_CELL               = 0x3
19	TAG_CELL_NAME          = 0x4
20	TAG_CELL_VALUE         = 0x5
21	TAG_CELL_TYPE          = 0x6
22	TAG_CELL_TIMESTAMP     = 0x7
23	TAG_DELETE_ROW_MARKER  = 0x8
24	TAG_ROW_CHECKSUM       = 0x9
25	TAG_CELL_CHECKSUM      = 0x0A
26	TAG_EXTENSION          = 0x0B
27	TAG_SEQ_INFO           = 0x0C
28	TAG_SEQ_INFO_EPOCH     = 0x0D
29	TAG_SEQ_INFO_TS        = 0x0E
30	TAG_SEQ_INFO_ROW_INDEX = 0x0F
31
32	// cell op type
33	DELETE_ALL_VERSION = 0x1
34	DELETE_ONE_VERSION = 0x3
35
36	// variant type
37	VT_INTEGER = 0x0
38	VT_DOUBLE  = 0x1
39	VT_BOOLEAN = 0x2
40	VT_STRING  = 0x3
41
42	//public final static byte VT_NULL = 0x6;
43	VT_BLOB           = 0x7
44	VT_INF_MIN        = 0x9
45	VT_INF_MAX        = 0xa
46	VT_AUTO_INCREMENT = 0xb
47
48	LITTLE_ENDIAN_32_SIZE = 4
49	LITTLE_ENDIAN_64_SIZE = 8
50)
51
52const spaceSize = 256
53
54var crc8Table = make([]byte, spaceSize)
55
56func init() {
57	for i := 0; i < spaceSize; i++ {
58		x := byte(i)
59		for j := 8; j > 0; j-- {
60			if (x & 0x80) != 0 {
61				x = (x << 1) ^ 0x07
62			} else {
63				x = (x << 1) ^ 0
64			}
65		}
66		crc8Table[i] = x
67	}
68}
69
70func crc8Byte(crc, in byte) byte {
71	return crc8Table[(crc^in)&0xff]
72}
73
74func crc8Int32(crc byte, in int32) byte {
75	for i := 0; i < 4; i++ {
76		crc = crc8Byte(crc, byte((in & 0xff)))
77		in >>= 8
78	}
79
80	return crc
81}
82
83func crc8Int64(crc byte, in int64) byte {
84	for i := 0; i < 8; i++ {
85		crc = crc8Byte(crc, byte((in & 0xff)))
86		in >>= 8
87	}
88
89	return crc
90}
91
92func crc8Bytes(crc byte, in []byte) byte {
93	for i := 0; i < len(in); i++ {
94		crc = crc8Byte(crc, in[i])
95	}
96
97	return crc
98}
99
100func writeRawByte(w io.Writer, value byte) {
101	w.Write([]byte{value})
102}
103
104/*func writeRawByteInt8(w io.Writer, value int) {
105	w.Write([]byte{byte(value)})
106}*/
107
108func writeRawLittleEndian32(w io.Writer, value int32) {
109	w.Write([]byte{byte((value) & 0xFF)})
110	w.Write([]byte{byte((value >> 8) & 0xFF)})
111	w.Write([]byte{byte((value >> 16) & 0xFF)})
112	w.Write([]byte{byte((value >> 24) & 0xFF)})
113}
114
115func writeRawLittleEndian64(w io.Writer, value int64) {
116	w.Write([]byte{byte((value) & 0xFF)})
117	w.Write([]byte{byte((value >> 8) & 0xFF)})
118	w.Write([]byte{byte((value >> 16) & 0xFF)})
119	w.Write([]byte{byte((value >> 24) & 0xFF)})
120	w.Write([]byte{byte((value >> 32) & 0xFF)})
121	w.Write([]byte{byte((value >> 40) & 0xFF)})
122	w.Write([]byte{byte((value >> 48) & 0xFF)})
123	w.Write([]byte{byte((value >> 56) & 0xFF)})
124}
125
126func writeDouble(w io.Writer, value float64) {
127	writeRawLittleEndian64(w, int64(math.Float64bits(value)))
128}
129
130func writeBoolean(w io.Writer, value bool) {
131	if value {
132		w.Write([]byte{byte(1)})
133	} else {
134		w.Write([]byte{byte(0)})
135	}
136}
137
138func writeBytes(w io.Writer, value []byte) {
139	w.Write(value)
140}
141
142func writeHeader(w io.Writer) {
143	writeRawLittleEndian32(w, HEADER)
144}
145
146func writeTag(w io.Writer, tag byte) {
147	writeRawByte(w, tag)
148}
149
150func writeCellName(w io.Writer, name []byte) {
151	writeTag(w, TAG_CELL_NAME)
152	writeRawLittleEndian32(w, int32(len(name)))
153	writeBytes(w, name)
154}
155
156type ColumnType int32
157
158const (
159	ColumnType_STRING  ColumnType = 1
160	ColumnType_INTEGER ColumnType = 2
161	ColumnType_BOOLEAN ColumnType = 3
162	ColumnType_DOUBLE  ColumnType = 4
163	ColumnType_BINARY  ColumnType = 5
164	ColumnType_MIN     ColumnType = 6
165	ColumnType_MAX     ColumnType = 7
166)
167
168type ColumnValue struct {
169	Type  ColumnType
170	Value interface{}
171}
172
173func (cv *ColumnValue) writeCellValue(w io.Writer) {
174	writeTag(w, TAG_CELL_VALUE)
175	if cv == nil {
176		writeRawLittleEndian32(w, 1)
177		writeRawByte(w, VT_AUTO_INCREMENT)
178		return
179	}
180
181	switch cv.Type {
182	case ColumnType_STRING:
183		v := cv.Value.(string)
184
185		writeRawLittleEndian32(w, int32(LITTLE_ENDIAN_32_SIZE+1+len(v))) // length + type + value
186		writeRawByte(w, VT_STRING)
187		writeRawLittleEndian32(w, int32(len(v)))
188		writeBytes(w, []byte(v))
189
190	case ColumnType_INTEGER:
191		v := cv.Value.(int64)
192		writeRawLittleEndian32(w, int32(LITTLE_ENDIAN_64_SIZE+1))
193		writeRawByte(w, VT_INTEGER)
194		writeRawLittleEndian64(w, v)
195	case ColumnType_BOOLEAN:
196		v := cv.Value.(bool)
197		writeRawLittleEndian32(w, 2)
198		writeRawByte(w, VT_BOOLEAN)
199		writeBoolean(w, v)
200
201	case ColumnType_DOUBLE:
202		v := cv.Value.(float64)
203
204		writeRawLittleEndian32(w, LITTLE_ENDIAN_64_SIZE+1)
205		writeRawByte(w, VT_DOUBLE)
206		writeDouble(w, v)
207
208	case ColumnType_BINARY:
209		v := cv.Value.([]byte)
210
211		writeRawLittleEndian32(w, int32(LITTLE_ENDIAN_32_SIZE+1+len(v))) // length + type + value
212		writeRawByte(w, VT_BLOB)
213		writeRawLittleEndian32(w, int32(len(v)))
214		writeBytes(w, v)
215	}
216}
217
218func (cv *ColumnValue) getCheckSum(crc byte) byte {
219	if cv == nil {
220		return crc8Byte(crc, VT_AUTO_INCREMENT)
221	}
222
223	switch cv.Type {
224	case ColumnType_STRING:
225		v := cv.Value.(string)
226		crc = crc8Byte(crc, VT_STRING)
227		crc = crc8Int32(crc, int32(len(v)))
228		crc = crc8Bytes(crc, []byte(v))
229	case ColumnType_INTEGER:
230		v := cv.Value.(int64)
231		crc = crc8Byte(crc, VT_INTEGER)
232		crc = crc8Int64(crc, v)
233	case ColumnType_BOOLEAN:
234		v := cv.Value.(bool)
235		crc = crc8Byte(crc, VT_BOOLEAN)
236		if v {
237			crc = crc8Byte(crc, 0x1)
238		} else {
239			crc = crc8Byte(crc, 0x0)
240		}
241
242	case ColumnType_DOUBLE:
243		v := cv.Value.(float64)
244		crc = crc8Byte(crc, VT_DOUBLE)
245		crc = crc8Int64(crc, int64(math.Float64bits(v)))
246	case ColumnType_BINARY:
247		v := cv.Value.([]byte)
248		crc = crc8Byte(crc, VT_BLOB)
249		crc = crc8Int32(crc, int32(len(v)))
250		crc = crc8Bytes(crc, v)
251	}
252
253	return crc
254}
255
256type PlainBufferCell struct {
257	CellName         []byte
258	CellValue        *ColumnValue
259	CellTimestamp    int64
260	CellType         byte
261	IgnoreValue      bool
262	HasCellTimestamp bool
263	HasCellType      bool
264}
265
266func (cell *PlainBufferCell) writeCell(w io.Writer) {
267	writeTag(w, TAG_CELL)
268	writeCellName(w, cell.CellName)
269	if cell.IgnoreValue == false {
270		cell.CellValue.writeCellValue(w)
271	}
272
273	if cell.HasCellType {
274		writeTag(w, TAG_CELL_TYPE)
275		writeRawByte(w, cell.CellType)
276	}
277
278	if cell.HasCellTimestamp {
279		writeTag(w, TAG_CELL_TIMESTAMP)
280		writeRawLittleEndian64(w, cell.CellTimestamp)
281	}
282
283	writeTag(w, TAG_CELL_CHECKSUM)
284	writeRawByte(w, cell.getCheckSum(byte(0x0)))
285}
286
287func (cell *PlainBufferCell) getCheckSum(crc byte) byte {
288	crc = crc8Bytes(crc, cell.CellName)
289	if cell.IgnoreValue == false {
290		crc = cell.CellValue.getCheckSum(crc)
291	}
292
293	if cell.HasCellTimestamp {
294		crc = crc8Int64(crc, cell.CellTimestamp)
295	}
296	if cell.HasCellType {
297		crc = crc8Byte(crc, cell.CellType)
298	}
299	return crc
300}
301
302type PlainBufferRow struct {
303	PrimaryKey      []*PlainBufferCell
304	Cells           []*PlainBufferCell
305	HasDeleteMarker bool
306	Extension       *tablestore.RecordSequenceInfo // optional
307}
308
309func (row *PlainBufferRow) writeRow(w io.Writer) {
310	/* pk */
311	writeTag(w, TAG_ROW_PK)
312	for _, pk := range row.PrimaryKey {
313		pk.writeCell(w)
314	}
315
316	if len(row.Cells) > 0 {
317		writeTag(w, TAG_ROW_DATA)
318		for _, cell := range row.Cells {
319			cell.writeCell(w)
320		}
321	}
322
323	writeTag(w, TAG_ROW_CHECKSUM)
324	writeRawByte(w, row.getCheckSum(byte(0x0)))
325}
326
327func (row *PlainBufferRow) writeRowWithHeader(w io.Writer) {
328	writeHeader(w)
329	row.writeRow(w)
330}
331
332func (row *PlainBufferRow) getCheckSum(crc byte) byte {
333	for _, cell := range row.PrimaryKey {
334		crcCell := cell.getCheckSum(byte(0x0))
335		crc = crc8Byte(crc, crcCell)
336	}
337
338	for _, cell := range row.Cells {
339		crcCell := cell.getCheckSum(byte(0x0))
340		crc = crc8Byte(crc, crcCell)
341	}
342
343	del := byte(0x0)
344	if row.HasDeleteMarker {
345		del = byte(0x1)
346	}
347
348	crc = crc8Byte(crc, del)
349
350	return crc
351}
352
353func readRawByte(r *bytes.Reader) byte {
354	if r.Len() == 0 {
355		panic(errUnexpectIoEnd)
356	}
357
358	b, _ := r.ReadByte()
359
360	return b
361}
362
363func readTag(r *bytes.Reader) int {
364	return int(readRawByte(r))
365}
366
367func readRawLittleEndian64(r *bytes.Reader) int64 {
368	if r.Len() < 8 {
369		panic(errUnexpectIoEnd)
370	}
371
372	var v int64
373	binary.Read(r, binary.LittleEndian, &v)
374
375	return v
376}
377
378func readRawLittleEndian32(r *bytes.Reader) int32 {
379	if r.Len() < 4 {
380		panic(errUnexpectIoEnd)
381	}
382
383	var v int32
384	binary.Read(r, binary.LittleEndian, &v)
385
386	return v
387}
388
389func readBoolean(r *bytes.Reader) bool {
390	return readRawByte(r) != 0
391}
392
393func readBytes(r *bytes.Reader, size int32) []byte {
394	if int32(r.Len()) < size {
395		panic(errUnexpectIoEnd)
396	}
397	v := make([]byte, size)
398	r.Read(v)
399	return v
400}
401
402func readCellValue(r *bytes.Reader) *ColumnValue {
403	value := new(ColumnValue)
404	readRawLittleEndian32(r)
405	tp := readRawByte(r)
406	switch tp {
407	case VT_INTEGER:
408		value.Type = ColumnType_INTEGER
409		value.Value = readRawLittleEndian64(r)
410	case VT_DOUBLE:
411		value.Type = ColumnType_DOUBLE
412		value.Value = math.Float64frombits(uint64(readRawLittleEndian64(r)))
413	case VT_BOOLEAN:
414		value.Type = ColumnType_BOOLEAN
415		value.Value = readBoolean(r)
416	case VT_STRING:
417		value.Type = ColumnType_STRING
418		value.Value = string(readBytes(r, readRawLittleEndian32(r)))
419	case VT_BLOB:
420		value.Type = ColumnType_BINARY
421		value.Value = []byte(readBytes(r, readRawLittleEndian32(r)))
422	case VT_INF_MAX:
423		value.Type = ColumnType_MAX
424	case VT_INF_MIN:
425		value.Type = ColumnType_MIN
426	}
427	return value
428}
429
430func readCell(r *bytes.Reader) *PlainBufferCell {
431	cell := new(PlainBufferCell)
432	tag := readTag(r)
433	if tag != TAG_CELL_NAME {
434		panic(errTag)
435	}
436
437	cell.CellName = readBytes(r, readRawLittleEndian32(r))
438	tag = readTag(r)
439
440	if tag == TAG_CELL_VALUE {
441		cell.CellValue = readCellValue(r)
442		tag = readTag(r)
443	}
444	if tag == TAG_CELL_TYPE {
445		b := readRawByte(r)
446		switch b {
447		case DELETE_ALL_VERSION:
448			cell.CellType = DELETE_ALL_VERSION
449		case DELETE_ONE_VERSION:
450			cell.CellType = DELETE_ONE_VERSION
451		}
452		tag = readTag(r)
453	}
454
455	if tag == TAG_CELL_TIMESTAMP {
456		cell.CellTimestamp = readRawLittleEndian64(r)
457		cell.HasCellTimestamp = true
458		tag = readTag(r)
459	}
460
461	if tag == TAG_CELL_CHECKSUM {
462		readRawByte(r)
463	} else {
464		panic(errNoChecksum)
465	}
466
467	return cell
468}
469
470func readRowPk(r *bytes.Reader) []*PlainBufferCell {
471	primaryKeyColumns := make([]*PlainBufferCell, 0, 4)
472	if readTag(r) != TAG_ROW_PK {
473		panic(errTag)
474	}
475
476	tag := readTag(r)
477	for tag == TAG_CELL {
478		primaryKeyColumns = append(primaryKeyColumns, readCell(r))
479		tag = readTag(r)
480	}
481
482	r.Seek(-1, 1)
483
484	return primaryKeyColumns
485}
486
487func readRowData(r *bytes.Reader) []*PlainBufferCell {
488	columns := make([]*PlainBufferCell, 0, 10)
489
490	tag := readTag(r)
491	for tag == TAG_CELL {
492		columns = append(columns, readCell(r))
493		tag = readTag(r)
494	}
495
496	r.Seek(-1, 1)
497
498	return columns
499}
500
501func readRow(r *bytes.Reader) *PlainBufferRow {
502	row := new(PlainBufferRow)
503	row.PrimaryKey = readRowPk(r)
504	tag := readTag(r)
505
506	if tag == TAG_ROW_DATA {
507		row.Cells = readRowData(r)
508		tag = readTag(r)
509	}
510
511	if tag == TAG_DELETE_ROW_MARKER {
512		row.HasDeleteMarker = true
513		tag = readTag(r)
514	}
515
516	if tag == TAG_EXTENSION {
517		row.Extension = readRowExtension(r)
518		tag = readTag(r)
519	}
520
521	if tag == TAG_ROW_CHECKSUM {
522		readRawByte(r)
523	} else {
524		panic(errNoChecksum)
525	}
526	return row
527}
528
529func ReadRowsWithHeader(r *bytes.Reader) (rows []*PlainBufferRow, err error) {
530	defer func() {
531		if err2 := recover(); err2 != nil {
532			if _, ok := err2.(error); ok {
533				err = err2.(error)
534			}
535			return
536		}
537	}()
538
539	// TODO: panic
540	if readRawLittleEndian32(r) != HEADER {
541		return nil, fmt.Errorf("Invalid header from plain buffer")
542	}
543
544	rows = make([]*PlainBufferRow, 0, 10)
545
546	for r.Len() > 0 {
547		rows = append(rows, readRow(r))
548	}
549
550	return rows, nil
551}
552
553func readRowExtension(r *bytes.Reader) *tablestore.RecordSequenceInfo {
554	readRawLittleEndian32(r) // useless
555	tag := readTag(r)
556	if tag != TAG_SEQ_INFO {
557		panic(errTag)
558	}
559
560	readRawLittleEndian32(r) // useless
561	tag = readTag(r)
562	if tag != TAG_SEQ_INFO_EPOCH {
563		panic(errTag)
564	}
565	epoch := readRawLittleEndian32(r)
566
567	tag = readTag(r)
568	if tag != TAG_SEQ_INFO_TS {
569		panic(errTag)
570	}
571	ts := readRawLittleEndian64(r)
572
573	tag = readTag(r)
574	if tag != TAG_SEQ_INFO_ROW_INDEX {
575		panic(errTag)
576	}
577	rowIndex := readRawLittleEndian32(r)
578
579	ext := tablestore.RecordSequenceInfo{}
580	ext.Epoch = epoch
581	ext.Timestamp = ts
582	ext.RowIndex = rowIndex
583	return &ext
584}
585