1package protocol 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "fmt" 7 "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore" 8 "io" 9 "math" 10) 11 12const ( 13 HEADER = 0x75 14 15 // tag type 16 TAG_ROW_PK = 0x1 17 TAG_ROW_DATA = 0x2 18 TAG_CELL = 0x3 19 TAG_CELL_NAME = 0x4 20 TAG_CELL_VALUE = 0x5 21 TAG_CELL_TYPE = 0x6 22 TAG_CELL_TIMESTAMP = 0x7 23 TAG_DELETE_ROW_MARKER = 0x8 24 TAG_ROW_CHECKSUM = 0x9 25 TAG_CELL_CHECKSUM = 0x0A 26 TAG_EXTENSION = 0x0B 27 TAG_SEQ_INFO = 0x0C 28 TAG_SEQ_INFO_EPOCH = 0x0D 29 TAG_SEQ_INFO_TS = 0x0E 30 TAG_SEQ_INFO_ROW_INDEX = 0x0F 31 32 // cell op type 33 DELETE_ALL_VERSION = 0x1 34 DELETE_ONE_VERSION = 0x3 35 36 // variant type 37 VT_INTEGER = 0x0 38 VT_DOUBLE = 0x1 39 VT_BOOLEAN = 0x2 40 VT_STRING = 0x3 41 42 //public final static byte VT_NULL = 0x6; 43 VT_BLOB = 0x7 44 VT_INF_MIN = 0x9 45 VT_INF_MAX = 0xa 46 VT_AUTO_INCREMENT = 0xb 47 48 LITTLE_ENDIAN_32_SIZE = 4 49 LITTLE_ENDIAN_64_SIZE = 8 50) 51 52const spaceSize = 256 53 54var crc8Table = make([]byte, spaceSize) 55 56func init() { 57 for i := 0; i < spaceSize; i++ { 58 x := byte(i) 59 for j := 8; j > 0; j-- { 60 if (x & 0x80) != 0 { 61 x = (x << 1) ^ 0x07 62 } else { 63 x = (x << 1) ^ 0 64 } 65 } 66 crc8Table[i] = x 67 } 68} 69 70func crc8Byte(crc, in byte) byte { 71 return crc8Table[(crc^in)&0xff] 72} 73 74func crc8Int32(crc byte, in int32) byte { 75 for i := 0; i < 4; i++ { 76 crc = crc8Byte(crc, byte((in & 0xff))) 77 in >>= 8 78 } 79 80 return crc 81} 82 83func crc8Int64(crc byte, in int64) byte { 84 for i := 0; i < 8; i++ { 85 crc = crc8Byte(crc, byte((in & 0xff))) 86 in >>= 8 87 } 88 89 return crc 90} 91 92func crc8Bytes(crc byte, in []byte) byte { 93 for i := 0; i < len(in); i++ { 94 crc = crc8Byte(crc, in[i]) 95 } 96 97 return crc 98} 99 100func writeRawByte(w io.Writer, value byte) { 101 w.Write([]byte{value}) 102} 103 104/*func writeRawByteInt8(w io.Writer, value int) { 105 w.Write([]byte{byte(value)}) 106}*/ 107 108func writeRawLittleEndian32(w io.Writer, value int32) { 109 w.Write([]byte{byte((value) & 0xFF)}) 110 w.Write([]byte{byte((value >> 8) & 0xFF)}) 111 w.Write([]byte{byte((value >> 16) & 0xFF)}) 112 w.Write([]byte{byte((value >> 24) & 0xFF)}) 113} 114 115func writeRawLittleEndian64(w io.Writer, value int64) { 116 w.Write([]byte{byte((value) & 0xFF)}) 117 w.Write([]byte{byte((value >> 8) & 0xFF)}) 118 w.Write([]byte{byte((value >> 16) & 0xFF)}) 119 w.Write([]byte{byte((value >> 24) & 0xFF)}) 120 w.Write([]byte{byte((value >> 32) & 0xFF)}) 121 w.Write([]byte{byte((value >> 40) & 0xFF)}) 122 w.Write([]byte{byte((value >> 48) & 0xFF)}) 123 w.Write([]byte{byte((value >> 56) & 0xFF)}) 124} 125 126func writeDouble(w io.Writer, value float64) { 127 writeRawLittleEndian64(w, int64(math.Float64bits(value))) 128} 129 130func writeBoolean(w io.Writer, value bool) { 131 if value { 132 w.Write([]byte{byte(1)}) 133 } else { 134 w.Write([]byte{byte(0)}) 135 } 136} 137 138func writeBytes(w io.Writer, value []byte) { 139 w.Write(value) 140} 141 142func writeHeader(w io.Writer) { 143 writeRawLittleEndian32(w, HEADER) 144} 145 146func writeTag(w io.Writer, tag byte) { 147 writeRawByte(w, tag) 148} 149 150func writeCellName(w io.Writer, name []byte) { 151 writeTag(w, TAG_CELL_NAME) 152 writeRawLittleEndian32(w, int32(len(name))) 153 writeBytes(w, name) 154} 155 156type ColumnType int32 157 158const ( 159 ColumnType_STRING ColumnType = 1 160 ColumnType_INTEGER ColumnType = 2 161 ColumnType_BOOLEAN ColumnType = 3 162 ColumnType_DOUBLE ColumnType = 4 163 ColumnType_BINARY ColumnType = 5 164 ColumnType_MIN ColumnType = 6 165 ColumnType_MAX ColumnType = 7 166) 167 168type ColumnValue struct { 169 Type ColumnType 170 Value interface{} 171} 172 173func (cv *ColumnValue) writeCellValue(w io.Writer) { 174 writeTag(w, TAG_CELL_VALUE) 175 if cv == nil { 176 writeRawLittleEndian32(w, 1) 177 writeRawByte(w, VT_AUTO_INCREMENT) 178 return 179 } 180 181 switch cv.Type { 182 case ColumnType_STRING: 183 v := cv.Value.(string) 184 185 writeRawLittleEndian32(w, int32(LITTLE_ENDIAN_32_SIZE+1+len(v))) // length + type + value 186 writeRawByte(w, VT_STRING) 187 writeRawLittleEndian32(w, int32(len(v))) 188 writeBytes(w, []byte(v)) 189 190 case ColumnType_INTEGER: 191 v := cv.Value.(int64) 192 writeRawLittleEndian32(w, int32(LITTLE_ENDIAN_64_SIZE+1)) 193 writeRawByte(w, VT_INTEGER) 194 writeRawLittleEndian64(w, v) 195 case ColumnType_BOOLEAN: 196 v := cv.Value.(bool) 197 writeRawLittleEndian32(w, 2) 198 writeRawByte(w, VT_BOOLEAN) 199 writeBoolean(w, v) 200 201 case ColumnType_DOUBLE: 202 v := cv.Value.(float64) 203 204 writeRawLittleEndian32(w, LITTLE_ENDIAN_64_SIZE+1) 205 writeRawByte(w, VT_DOUBLE) 206 writeDouble(w, v) 207 208 case ColumnType_BINARY: 209 v := cv.Value.([]byte) 210 211 writeRawLittleEndian32(w, int32(LITTLE_ENDIAN_32_SIZE+1+len(v))) // length + type + value 212 writeRawByte(w, VT_BLOB) 213 writeRawLittleEndian32(w, int32(len(v))) 214 writeBytes(w, v) 215 } 216} 217 218func (cv *ColumnValue) getCheckSum(crc byte) byte { 219 if cv == nil { 220 return crc8Byte(crc, VT_AUTO_INCREMENT) 221 } 222 223 switch cv.Type { 224 case ColumnType_STRING: 225 v := cv.Value.(string) 226 crc = crc8Byte(crc, VT_STRING) 227 crc = crc8Int32(crc, int32(len(v))) 228 crc = crc8Bytes(crc, []byte(v)) 229 case ColumnType_INTEGER: 230 v := cv.Value.(int64) 231 crc = crc8Byte(crc, VT_INTEGER) 232 crc = crc8Int64(crc, v) 233 case ColumnType_BOOLEAN: 234 v := cv.Value.(bool) 235 crc = crc8Byte(crc, VT_BOOLEAN) 236 if v { 237 crc = crc8Byte(crc, 0x1) 238 } else { 239 crc = crc8Byte(crc, 0x0) 240 } 241 242 case ColumnType_DOUBLE: 243 v := cv.Value.(float64) 244 crc = crc8Byte(crc, VT_DOUBLE) 245 crc = crc8Int64(crc, int64(math.Float64bits(v))) 246 case ColumnType_BINARY: 247 v := cv.Value.([]byte) 248 crc = crc8Byte(crc, VT_BLOB) 249 crc = crc8Int32(crc, int32(len(v))) 250 crc = crc8Bytes(crc, v) 251 } 252 253 return crc 254} 255 256type PlainBufferCell struct { 257 CellName []byte 258 CellValue *ColumnValue 259 CellTimestamp int64 260 CellType byte 261 IgnoreValue bool 262 HasCellTimestamp bool 263 HasCellType bool 264} 265 266func (cell *PlainBufferCell) writeCell(w io.Writer) { 267 writeTag(w, TAG_CELL) 268 writeCellName(w, cell.CellName) 269 if cell.IgnoreValue == false { 270 cell.CellValue.writeCellValue(w) 271 } 272 273 if cell.HasCellType { 274 writeTag(w, TAG_CELL_TYPE) 275 writeRawByte(w, cell.CellType) 276 } 277 278 if cell.HasCellTimestamp { 279 writeTag(w, TAG_CELL_TIMESTAMP) 280 writeRawLittleEndian64(w, cell.CellTimestamp) 281 } 282 283 writeTag(w, TAG_CELL_CHECKSUM) 284 writeRawByte(w, cell.getCheckSum(byte(0x0))) 285} 286 287func (cell *PlainBufferCell) getCheckSum(crc byte) byte { 288 crc = crc8Bytes(crc, cell.CellName) 289 if cell.IgnoreValue == false { 290 crc = cell.CellValue.getCheckSum(crc) 291 } 292 293 if cell.HasCellTimestamp { 294 crc = crc8Int64(crc, cell.CellTimestamp) 295 } 296 if cell.HasCellType { 297 crc = crc8Byte(crc, cell.CellType) 298 } 299 return crc 300} 301 302type PlainBufferRow struct { 303 PrimaryKey []*PlainBufferCell 304 Cells []*PlainBufferCell 305 HasDeleteMarker bool 306 Extension *tablestore.RecordSequenceInfo // optional 307} 308 309func (row *PlainBufferRow) writeRow(w io.Writer) { 310 /* pk */ 311 writeTag(w, TAG_ROW_PK) 312 for _, pk := range row.PrimaryKey { 313 pk.writeCell(w) 314 } 315 316 if len(row.Cells) > 0 { 317 writeTag(w, TAG_ROW_DATA) 318 for _, cell := range row.Cells { 319 cell.writeCell(w) 320 } 321 } 322 323 writeTag(w, TAG_ROW_CHECKSUM) 324 writeRawByte(w, row.getCheckSum(byte(0x0))) 325} 326 327func (row *PlainBufferRow) writeRowWithHeader(w io.Writer) { 328 writeHeader(w) 329 row.writeRow(w) 330} 331 332func (row *PlainBufferRow) getCheckSum(crc byte) byte { 333 for _, cell := range row.PrimaryKey { 334 crcCell := cell.getCheckSum(byte(0x0)) 335 crc = crc8Byte(crc, crcCell) 336 } 337 338 for _, cell := range row.Cells { 339 crcCell := cell.getCheckSum(byte(0x0)) 340 crc = crc8Byte(crc, crcCell) 341 } 342 343 del := byte(0x0) 344 if row.HasDeleteMarker { 345 del = byte(0x1) 346 } 347 348 crc = crc8Byte(crc, del) 349 350 return crc 351} 352 353func readRawByte(r *bytes.Reader) byte { 354 if r.Len() == 0 { 355 panic(errUnexpectIoEnd) 356 } 357 358 b, _ := r.ReadByte() 359 360 return b 361} 362 363func readTag(r *bytes.Reader) int { 364 return int(readRawByte(r)) 365} 366 367func readRawLittleEndian64(r *bytes.Reader) int64 { 368 if r.Len() < 8 { 369 panic(errUnexpectIoEnd) 370 } 371 372 var v int64 373 binary.Read(r, binary.LittleEndian, &v) 374 375 return v 376} 377 378func readRawLittleEndian32(r *bytes.Reader) int32 { 379 if r.Len() < 4 { 380 panic(errUnexpectIoEnd) 381 } 382 383 var v int32 384 binary.Read(r, binary.LittleEndian, &v) 385 386 return v 387} 388 389func readBoolean(r *bytes.Reader) bool { 390 return readRawByte(r) != 0 391} 392 393func readBytes(r *bytes.Reader, size int32) []byte { 394 if int32(r.Len()) < size { 395 panic(errUnexpectIoEnd) 396 } 397 v := make([]byte, size) 398 r.Read(v) 399 return v 400} 401 402func readCellValue(r *bytes.Reader) *ColumnValue { 403 value := new(ColumnValue) 404 readRawLittleEndian32(r) 405 tp := readRawByte(r) 406 switch tp { 407 case VT_INTEGER: 408 value.Type = ColumnType_INTEGER 409 value.Value = readRawLittleEndian64(r) 410 case VT_DOUBLE: 411 value.Type = ColumnType_DOUBLE 412 value.Value = math.Float64frombits(uint64(readRawLittleEndian64(r))) 413 case VT_BOOLEAN: 414 value.Type = ColumnType_BOOLEAN 415 value.Value = readBoolean(r) 416 case VT_STRING: 417 value.Type = ColumnType_STRING 418 value.Value = string(readBytes(r, readRawLittleEndian32(r))) 419 case VT_BLOB: 420 value.Type = ColumnType_BINARY 421 value.Value = []byte(readBytes(r, readRawLittleEndian32(r))) 422 case VT_INF_MAX: 423 value.Type = ColumnType_MAX 424 case VT_INF_MIN: 425 value.Type = ColumnType_MIN 426 } 427 return value 428} 429 430func readCell(r *bytes.Reader) *PlainBufferCell { 431 cell := new(PlainBufferCell) 432 tag := readTag(r) 433 if tag != TAG_CELL_NAME { 434 panic(errTag) 435 } 436 437 cell.CellName = readBytes(r, readRawLittleEndian32(r)) 438 tag = readTag(r) 439 440 if tag == TAG_CELL_VALUE { 441 cell.CellValue = readCellValue(r) 442 tag = readTag(r) 443 } 444 if tag == TAG_CELL_TYPE { 445 b := readRawByte(r) 446 switch b { 447 case DELETE_ALL_VERSION: 448 cell.CellType = DELETE_ALL_VERSION 449 case DELETE_ONE_VERSION: 450 cell.CellType = DELETE_ONE_VERSION 451 } 452 tag = readTag(r) 453 } 454 455 if tag == TAG_CELL_TIMESTAMP { 456 cell.CellTimestamp = readRawLittleEndian64(r) 457 cell.HasCellTimestamp = true 458 tag = readTag(r) 459 } 460 461 if tag == TAG_CELL_CHECKSUM { 462 readRawByte(r) 463 } else { 464 panic(errNoChecksum) 465 } 466 467 return cell 468} 469 470func readRowPk(r *bytes.Reader) []*PlainBufferCell { 471 primaryKeyColumns := make([]*PlainBufferCell, 0, 4) 472 if readTag(r) != TAG_ROW_PK { 473 panic(errTag) 474 } 475 476 tag := readTag(r) 477 for tag == TAG_CELL { 478 primaryKeyColumns = append(primaryKeyColumns, readCell(r)) 479 tag = readTag(r) 480 } 481 482 r.Seek(-1, 1) 483 484 return primaryKeyColumns 485} 486 487func readRowData(r *bytes.Reader) []*PlainBufferCell { 488 columns := make([]*PlainBufferCell, 0, 10) 489 490 tag := readTag(r) 491 for tag == TAG_CELL { 492 columns = append(columns, readCell(r)) 493 tag = readTag(r) 494 } 495 496 r.Seek(-1, 1) 497 498 return columns 499} 500 501func readRow(r *bytes.Reader) *PlainBufferRow { 502 row := new(PlainBufferRow) 503 row.PrimaryKey = readRowPk(r) 504 tag := readTag(r) 505 506 if tag == TAG_ROW_DATA { 507 row.Cells = readRowData(r) 508 tag = readTag(r) 509 } 510 511 if tag == TAG_DELETE_ROW_MARKER { 512 row.HasDeleteMarker = true 513 tag = readTag(r) 514 } 515 516 if tag == TAG_EXTENSION { 517 row.Extension = readRowExtension(r) 518 tag = readTag(r) 519 } 520 521 if tag == TAG_ROW_CHECKSUM { 522 readRawByte(r) 523 } else { 524 panic(errNoChecksum) 525 } 526 return row 527} 528 529func ReadRowsWithHeader(r *bytes.Reader) (rows []*PlainBufferRow, err error) { 530 defer func() { 531 if err2 := recover(); err2 != nil { 532 if _, ok := err2.(error); ok { 533 err = err2.(error) 534 } 535 return 536 } 537 }() 538 539 // TODO: panic 540 if readRawLittleEndian32(r) != HEADER { 541 return nil, fmt.Errorf("Invalid header from plain buffer") 542 } 543 544 rows = make([]*PlainBufferRow, 0, 10) 545 546 for r.Len() > 0 { 547 rows = append(rows, readRow(r)) 548 } 549 550 return rows, nil 551} 552 553func readRowExtension(r *bytes.Reader) *tablestore.RecordSequenceInfo { 554 readRawLittleEndian32(r) // useless 555 tag := readTag(r) 556 if tag != TAG_SEQ_INFO { 557 panic(errTag) 558 } 559 560 readRawLittleEndian32(r) // useless 561 tag = readTag(r) 562 if tag != TAG_SEQ_INFO_EPOCH { 563 panic(errTag) 564 } 565 epoch := readRawLittleEndian32(r) 566 567 tag = readTag(r) 568 if tag != TAG_SEQ_INFO_TS { 569 panic(errTag) 570 } 571 ts := readRawLittleEndian64(r) 572 573 tag = readTag(r) 574 if tag != TAG_SEQ_INFO_ROW_INDEX { 575 panic(errTag) 576 } 577 rowIndex := readRawLittleEndian32(r) 578 579 ext := tablestore.RecordSequenceInfo{} 580 ext.Epoch = epoch 581 ext.Timestamp = ts 582 ext.RowIndex = rowIndex 583 return &ext 584} 585