1package tablestore 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "fmt" 7 "io" 8 "math" 9) 10 11const ( 12 HEADER = 0x75 13 14 // tag type 15 TAG_ROW_PK = 0x1 16 TAG_ROW_DATA = 0x2 17 TAG_CELL = 0x3 18 TAG_CELL_NAME = 0x4 19 TAG_CELL_VALUE = 0x5 20 TAG_CELL_TYPE = 0x6 21 TAG_CELL_TIMESTAMP = 0x7 22 TAG_DELETE_ROW_MARKER = 0x8 23 TAG_ROW_CHECKSUM = 0x9 24 TAG_CELL_CHECKSUM = 0x0A 25 TAG_EXTENSION = 0x0B 26 TAG_SEQ_INFO = 0x0C 27 TAG_SEQ_INFO_EPOCH = 0x0D 28 TAG_SEQ_INFO_TS = 0x0E 29 TAG_SEQ_INFO_ROW_INDEX = 0x0F 30 31 // cell op type 32 DELETE_ALL_VERSION = 0x1 33 DELETE_ONE_VERSION = 0x3 34 INCREMENT = 0x4; 35 36 // variant type 37 VT_INTEGER = 0x0 38 VT_DOUBLE = 0x1 39 VT_BOOLEAN = 0x2 40 VT_STRING = 0x3 41 42 //public final static byte VT_NULL = 0x6; 43 VT_BLOB = 0x7 44 VT_INF_MIN = 0x9 45 VT_INF_MAX = 0xa 46 VT_AUTO_INCREMENT = 0xb 47 48 LITTLE_ENDIAN_32_SIZE = 4 49 LITTLE_ENDIAN_64_SIZE = 8 50) 51 52const spaceSize = 256 53 54var crc8Table = make([]byte, spaceSize) 55 56func init() { 57 for i := 0; i < spaceSize; i++ { 58 x := byte(i) 59 for j := 8; j > 0; j-- { 60 if (x & 0x80) != 0 { 61 x = (x << 1) ^ 0x07 62 } else { 63 x = (x << 1) ^ 0 64 } 65 } 66 crc8Table[i] = x 67 } 68} 69 70func crc8Byte(crc, in byte) byte { 71 return crc8Table[(crc^in)&0xff] 72} 73 74func crc8Int32(crc byte, in int32) byte { 75 for i := 0; i < 4; i++ { 76 crc = crc8Byte(crc, byte((in & 0xff))) 77 in >>= 8 78 } 79 80 return crc 81} 82 83func crc8Int64(crc byte, in int64) byte { 84 for i := 0; i < 8; i++ { 85 crc = crc8Byte(crc, byte((in & 0xff))) 86 in >>= 8 87 } 88 89 return crc 90} 91 92func crc8Bytes(crc byte, in []byte) byte { 93 for i := 0; i < len(in); i++ { 94 crc = crc8Byte(crc, in[i]) 95 } 96 97 return crc 98} 99 100func writeRawByte(w io.Writer, value byte) { 101 w.Write([]byte{value}) 102} 103 104/*func writeRawByteInt8(w io.Writer, value int) { 105 w.Write([]byte{byte(value)}) 106}*/ 107 108func writeRawLittleEndian32(w io.Writer, value int32) { 109 w.Write([]byte{byte((value) & 0xFF)}) 110 w.Write([]byte{byte((value >> 8) & 0xFF)}) 111 w.Write([]byte{byte((value >> 16) & 0xFF)}) 112 w.Write([]byte{byte((value >> 24) & 0xFF)}) 113} 114 115func writeRawLittleEndian64(w io.Writer, value int64) { 116 w.Write([]byte{byte((value) & 0xFF)}) 117 w.Write([]byte{byte((value >> 8) & 0xFF)}) 118 w.Write([]byte{byte((value >> 16) & 0xFF)}) 119 w.Write([]byte{byte((value >> 24) & 0xFF)}) 120 w.Write([]byte{byte((value >> 32) & 0xFF)}) 121 w.Write([]byte{byte((value >> 40) & 0xFF)}) 122 w.Write([]byte{byte((value >> 48) & 0xFF)}) 123 w.Write([]byte{byte((value >> 56) & 0xFF)}) 124} 125 126func writeDouble(w io.Writer, value float64) { 127 writeRawLittleEndian64(w, int64(math.Float64bits(value))) 128} 129 130func writeBoolean(w io.Writer, value bool) { 131 if value { 132 w.Write([]byte{byte(1)}) 133 } else { 134 w.Write([]byte{byte(0)}) 135 } 136} 137 138func writeBytes(w io.Writer, value []byte) { 139 w.Write(value) 140} 141 142func writeHeader(w io.Writer) { 143 writeRawLittleEndian32(w, HEADER) 144} 145 146func writeTag(w io.Writer, tag byte) { 147 writeRawByte(w, tag) 148} 149 150func writeCellName(w io.Writer, name []byte) { 151 writeTag(w, TAG_CELL_NAME) 152 writeRawLittleEndian32(w, int32(len(name))) 153 writeBytes(w, name) 154} 155 156type PlainBufferCell struct { 157 cellName []byte 158 cellValue *ColumnValue 159 cellTimestamp int64 160 cellType byte 161 ignoreValue bool 162 hasCellTimestamp bool 163 hasCellType bool 164} 165 166func (cell *PlainBufferCell) writeCell(w io.Writer) { 167 writeTag(w, TAG_CELL) 168 writeCellName(w, cell.cellName) 169 if cell.ignoreValue == false { 170 cell.cellValue.writeCellValue(w) 171 } 172 173 if cell.hasCellType { 174 writeTag(w, TAG_CELL_TYPE) 175 writeRawByte(w, cell.cellType) 176 } 177 178 if cell.hasCellTimestamp { 179 writeTag(w, TAG_CELL_TIMESTAMP) 180 writeRawLittleEndian64(w, cell.cellTimestamp) 181 } 182 183 writeTag(w, TAG_CELL_CHECKSUM) 184 writeRawByte(w, cell.getCheckSum(byte(0x0))) 185} 186 187func (cell *PlainBufferCell) getCheckSum(crc byte) byte { 188 crc = crc8Bytes(crc, cell.cellName) 189 if cell.ignoreValue == false { 190 crc = cell.cellValue.getCheckSum(crc) 191 } 192 193 if cell.hasCellTimestamp { 194 crc = crc8Int64(crc, cell.cellTimestamp) 195 } 196 if cell.hasCellType { 197 crc = crc8Byte(crc, cell.cellType) 198 } 199 return crc 200} 201 202type PlainBufferRow struct { 203 primaryKey []*PlainBufferCell 204 cells []*PlainBufferCell 205 hasDeleteMarker bool 206 extension *RecordSequenceInfo // optional 207} 208 209func (row *PlainBufferRow) writeRow(w io.Writer) { 210 /* pk */ 211 writeTag(w, TAG_ROW_PK) 212 for _, pk := range row.primaryKey { 213 pk.writeCell(w) 214 } 215 216 if len(row.cells) > 0 { 217 writeTag(w, TAG_ROW_DATA) 218 for _, cell := range row.cells { 219 cell.writeCell(w) 220 } 221 } 222 223 writeTag(w, TAG_ROW_CHECKSUM) 224 writeRawByte(w, row.getCheckSum(byte(0x0))) 225} 226 227func (row *PlainBufferRow) writeRowWithHeader(w io.Writer) { 228 writeHeader(w) 229 row.writeRow(w) 230} 231 232func (row *PlainBufferRow) getCheckSum(crc byte) byte { 233 for _, cell := range row.primaryKey { 234 crcCell := cell.getCheckSum(byte(0x0)) 235 crc = crc8Byte(crc, crcCell) 236 } 237 238 for _, cell := range row.cells { 239 crcCell := cell.getCheckSum(byte(0x0)) 240 crc = crc8Byte(crc, crcCell) 241 } 242 243 del := byte(0x0) 244 if row.hasDeleteMarker { 245 del = byte(0x1) 246 } 247 248 crc = crc8Byte(crc, del) 249 250 return crc 251} 252 253func readRawByte(r *bytes.Reader) byte { 254 if r.Len() == 0 { 255 panic(errUnexpectIoEnd) 256 } 257 258 b, _ := r.ReadByte() 259 260 return b 261} 262 263func readTag(r *bytes.Reader) int { 264 return int(readRawByte(r)) 265} 266 267func readRawLittleEndian64(r *bytes.Reader) int64 { 268 if r.Len() < 8 { 269 panic(errUnexpectIoEnd) 270 } 271 272 var v int64 273 binary.Read(r, binary.LittleEndian, &v) 274 275 return v 276} 277 278func readRawLittleEndian32(r *bytes.Reader) int32 { 279 if r.Len() < 4 { 280 panic(errUnexpectIoEnd) 281 } 282 283 var v int32 284 binary.Read(r, binary.LittleEndian, &v) 285 286 return v 287} 288 289func readBoolean(r *bytes.Reader) bool { 290 return readRawByte(r) != 0 291} 292 293func readBytes(r *bytes.Reader, size int32) []byte { 294 if int32(r.Len()) < size { 295 panic(errUnexpectIoEnd) 296 } 297 v := make([]byte, size) 298 r.Read(v) 299 return v 300} 301 302func readCellValue(r *bytes.Reader) *ColumnValue { 303 value := new(ColumnValue) 304 readRawLittleEndian32(r) 305 tp := readRawByte(r) 306 switch tp { 307 case VT_INTEGER: 308 value.Type = ColumnType_INTEGER 309 value.Value = readRawLittleEndian64(r) 310 case VT_DOUBLE: 311 value.Type = ColumnType_DOUBLE 312 value.Value = math.Float64frombits(uint64(readRawLittleEndian64(r))) 313 case VT_BOOLEAN: 314 value.Type = ColumnType_BOOLEAN 315 value.Value = readBoolean(r) 316 case VT_STRING: 317 value.Type = ColumnType_STRING 318 value.Value = string(readBytes(r, readRawLittleEndian32(r))) 319 case VT_BLOB: 320 value.Type = ColumnType_BINARY 321 value.Value = []byte(readBytes(r, readRawLittleEndian32(r))) 322 } 323 return value 324} 325 326func readCell(r *bytes.Reader) *PlainBufferCell { 327 cell := new(PlainBufferCell) 328 tag := readTag(r) 329 if tag != TAG_CELL_NAME { 330 panic(errTag) 331 } 332 333 cell.cellName = readBytes(r, readRawLittleEndian32(r)) 334 tag = readTag(r) 335 336 if tag == TAG_CELL_VALUE { 337 cell.cellValue = readCellValue(r) 338 tag = readTag(r) 339 } 340 if tag == TAG_CELL_TYPE { 341 readRawByte(r) 342 tag = readTag(r) 343 } 344 345 if tag == TAG_CELL_TIMESTAMP { 346 cell.cellTimestamp = readRawLittleEndian64(r) 347 tag = readTag(r) 348 } 349 350 if tag == TAG_CELL_CHECKSUM { 351 readRawByte(r) 352 } else { 353 panic(errNoChecksum) 354 } 355 356 return cell 357} 358 359func readRowPk(r *bytes.Reader) []*PlainBufferCell { 360 primaryKeyColumns := make([]*PlainBufferCell, 0, 4) 361 362 tag := readTag(r) 363 for tag == TAG_CELL { 364 primaryKeyColumns = append(primaryKeyColumns, readCell(r)) 365 tag = readTag(r) 366 } 367 368 r.Seek(-1, 1) 369 370 return primaryKeyColumns 371} 372 373func readRowData(r *bytes.Reader) []*PlainBufferCell { 374 columns := make([]*PlainBufferCell, 0, 10) 375 376 tag := readTag(r) 377 for tag == TAG_CELL { 378 columns = append(columns, readCell(r)) 379 tag = readTag(r) 380 } 381 382 r.Seek(-1, 1) 383 384 return columns 385} 386 387func readRow(r *bytes.Reader) *PlainBufferRow { 388 row := new(PlainBufferRow) 389 tag := readTag(r) 390 if tag == TAG_ROW_PK { 391 row.primaryKey = readRowPk(r) 392 tag = readTag(r) 393 } 394 395 if tag == TAG_ROW_DATA { 396 row.cells = readRowData(r) 397 tag = readTag(r) 398 } 399 400 if tag == TAG_DELETE_ROW_MARKER { 401 row.hasDeleteMarker = true 402 tag = readTag(r) 403 } 404 405 if tag == TAG_EXTENSION { 406 row.extension = readRowExtension(r) 407 tag = readTag(r) 408 } 409 410 if tag == TAG_ROW_CHECKSUM { 411 readRawByte(r) 412 } else { 413 panic(errNoChecksum) 414 } 415 return row 416} 417 418func readRowsWithHeader(r *bytes.Reader) (rows []*PlainBufferRow, err error) { 419 defer func() { 420 if err2 := recover(); err2 != nil { 421 if _, ok := err2.(error); ok { 422 err = err2.(error) 423 } 424 return 425 } 426 }() 427 428 // TODO: panic 429 if readRawLittleEndian32(r) != HEADER { 430 return nil, fmt.Errorf("Invalid header from plain buffer") 431 } 432 433 rows = make([]*PlainBufferRow, 0, 10) 434 435 for r.Len() > 0 { 436 rows = append(rows, readRow(r)) 437 } 438 439 return rows, nil 440} 441 442func readRowExtension(r *bytes.Reader) *RecordSequenceInfo { 443 readRawLittleEndian32(r) // useless 444 tag := readTag(r) 445 if tag != TAG_SEQ_INFO { 446 panic(errTag) 447 } 448 449 readRawLittleEndian32(r) // useless 450 tag = readTag(r) 451 if tag != TAG_SEQ_INFO_EPOCH { 452 panic(errTag) 453 } 454 epoch := readRawLittleEndian32(r) 455 456 tag = readTag(r) 457 if tag != TAG_SEQ_INFO_TS { 458 panic(errTag) 459 } 460 ts := readRawLittleEndian64(r) 461 462 tag = readTag(r) 463 if tag != TAG_SEQ_INFO_ROW_INDEX { 464 panic(errTag) 465 } 466 rowIndex := readRawLittleEndian32(r) 467 468 ext := RecordSequenceInfo{} 469 ext.Epoch = epoch 470 ext.Timestamp = ts 471 ext.RowIndex = rowIndex 472 return &ext 473} 474