1package tablestore 2 3import ( 4 "fmt" 5 "github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol" 6 "github.com/golang/protobuf/proto" 7 "math/rand" 8 "net/http" 9 "strconv" 10 "strings" 11 "time" 12 //"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore" 13) 14 15// @class TableStoreClient 16// The TableStoreClient, which will connect OTS service for authorization, create/list/ 17// delete tables/table groups, to get/put/delete a row. 18// Note: TableStoreClient is thread-safe. 19// TableStoreClient的功能包括连接OTS服务进行验证、创建/列出/删除表或表组、插入/获取/ 20// 删除/更新行数据 21type TableStoreClient struct { 22 endPoint string 23 instanceName string 24 accessKeyId string 25 accessKeySecret string 26 securityToken string 27 28 httpClient IHttpClient 29 config *TableStoreConfig 30 random *rand.Rand 31} 32 33type ClientOption func(*TableStoreClient) 34 35type TableStoreHttpClient struct { 36 httpClient *http.Client 37} 38 39// use this to mock http.client for testing 40type IHttpClient interface { 41 Do(*http.Request) (*http.Response, error) 42 New(*http.Client) 43} 44 45func (httpClient *TableStoreHttpClient) Do(req *http.Request) (*http.Response, error) { 46 return httpClient.httpClient.Do(req) 47} 48 49func (httpClient *TableStoreHttpClient) New(client *http.Client) { 50 httpClient.httpClient = client 51} 52 53type HTTPTimeout struct { 54 ConnectionTimeout time.Duration 55 RequestTimeout time.Duration 56} 57 58type TableStoreConfig struct { 59 RetryTimes uint 60 MaxRetryTime time.Duration 61 HTTPTimeout HTTPTimeout 62 MaxIdleConnections int 63} 64 65func NewDefaultTableStoreConfig() *TableStoreConfig { 66 httpTimeout := &HTTPTimeout{ 67 ConnectionTimeout: time.Second * 15, 68 RequestTimeout: time.Second * 30} 69 config := &TableStoreConfig{ 70 RetryTimes: 10, 71 HTTPTimeout: *httpTimeout, 72 MaxRetryTime: time.Second * 5, 73 MaxIdleConnections: 2000} 74 return config 75} 76 77type CreateTableRequest struct { 78 TableMeta *TableMeta 79 TableOption *TableOption 80 ReservedThroughput *ReservedThroughput 81 StreamSpec *StreamSpecification 82 IndexMetas []*IndexMeta 83} 84 85type CreateIndexRequest struct { 86 MainTableName string 87 IndexMeta *IndexMeta 88 IncludeBaseData bool 89} 90 91type DeleteIndexRequest struct { 92 MainTableName string 93 IndexName string 94} 95 96type ResponseInfo struct { 97 RequestId string 98} 99 100type CreateTableResponse struct { 101 ResponseInfo 102} 103 104type CreateIndexResponse struct { 105 ResponseInfo 106} 107 108type DeleteIndexResponse struct { 109 ResponseInfo 110} 111 112type DeleteTableResponse struct { 113 ResponseInfo 114} 115 116type TableMeta struct { 117 TableName string 118 SchemaEntry []*PrimaryKeySchema 119 DefinedColumns []*DefinedColumnSchema 120} 121 122type PrimaryKeySchema struct { 123 Name *string 124 Type *PrimaryKeyType 125 Option *PrimaryKeyOption 126} 127 128type PrimaryKey struct { 129 PrimaryKeys []*PrimaryKeyColumn 130} 131 132type TableOption struct { 133 TimeToAlive, MaxVersion int 134} 135 136type ReservedThroughput struct { 137 Readcap, Writecap int 138} 139 140type ListTableResponse struct { 141 TableNames []string 142 ResponseInfo 143} 144 145type DeleteTableRequest struct { 146 TableName string 147} 148 149type DescribeTableRequest struct { 150 TableName string 151} 152 153type DescribeTableResponse struct { 154 TableMeta *TableMeta 155 TableOption *TableOption 156 ReservedThroughput *ReservedThroughput 157 StreamDetails *StreamDetails 158 IndexMetas []*IndexMeta 159 ResponseInfo 160} 161 162type UpdateTableRequest struct { 163 TableName string 164 TableOption *TableOption 165 ReservedThroughput *ReservedThroughput 166 StreamSpec *StreamSpecification 167} 168 169type UpdateTableResponse struct { 170 TableOption *TableOption 171 ReservedThroughput *ReservedThroughput 172 StreamDetails *StreamDetails 173 ResponseInfo 174} 175 176type ConsumedCapacityUnit struct { 177 Read int32 178 Write int32 179} 180 181type PutRowResponse struct { 182 ConsumedCapacityUnit *ConsumedCapacityUnit 183 PrimaryKey PrimaryKey 184 ResponseInfo 185} 186 187type DeleteRowResponse struct { 188 ConsumedCapacityUnit *ConsumedCapacityUnit 189 ResponseInfo 190} 191 192type UpdateRowResponse struct { 193 Columns []*AttributeColumn 194 ConsumedCapacityUnit *ConsumedCapacityUnit 195 ResponseInfo 196} 197 198type PrimaryKeyType int32 199 200const ( 201 PrimaryKeyType_INTEGER PrimaryKeyType = 1 202 PrimaryKeyType_STRING PrimaryKeyType = 2 203 PrimaryKeyType_BINARY PrimaryKeyType = 3 204) 205 206const ( 207 DefaultRetryInterval = 10 208 MaxRetryInterval = 320 209) 210 211type PrimaryKeyOption int32 212 213const ( 214 NONE PrimaryKeyOption = 0 215 AUTO_INCREMENT PrimaryKeyOption = 1 216 MIN PrimaryKeyOption = 2 217 MAX PrimaryKeyOption = 3 218) 219 220type PrimaryKeyColumn struct { 221 ColumnName string 222 Value interface{} 223 PrimaryKeyOption PrimaryKeyOption 224} 225 226func (this *PrimaryKeyColumn) String() string { 227 xs := make([]string, 0) 228 xs = append(xs, fmt.Sprintf("\"Name\": \"%s\"", this.ColumnName)) 229 switch this.PrimaryKeyOption { 230 case NONE: 231 xs = append(xs, fmt.Sprintf("\"Value\": \"%s\"", this.Value)) 232 case MIN: 233 xs = append(xs, "\"Value\": -inf") 234 case MAX: 235 xs = append(xs, "\"Value\": +inf") 236 case AUTO_INCREMENT: 237 xs = append(xs, "\"Value\": auto-incr") 238 } 239 return fmt.Sprintf("{%s}", strings.Join(xs, ", ")) 240} 241 242type AttributeColumn struct { 243 ColumnName string 244 Value interface{} 245 Timestamp int64 246} 247 248type TimeRange struct { 249 Start int64 250 End int64 251 Specific int64 252} 253 254type ColumnToUpdate struct { 255 ColumnName string 256 Type byte 257 Timestamp int64 258 HasType bool 259 HasTimestamp bool 260 IgnoreValue bool 261 Value interface{} 262} 263 264type RowExistenceExpectation int 265 266const ( 267 RowExistenceExpectation_IGNORE RowExistenceExpectation = 0 268 RowExistenceExpectation_EXPECT_EXIST RowExistenceExpectation = 1 269 RowExistenceExpectation_EXPECT_NOT_EXIST RowExistenceExpectation = 2 270) 271 272type ComparatorType int32 273 274const ( 275 CT_EQUAL ComparatorType = 1 276 CT_NOT_EQUAL ComparatorType = 2 277 CT_GREATER_THAN ComparatorType = 3 278 CT_GREATER_EQUAL ComparatorType = 4 279 CT_LESS_THAN ComparatorType = 5 280 CT_LESS_EQUAL ComparatorType = 6 281) 282 283type LogicalOperator int32 284 285const ( 286 LO_NOT LogicalOperator = 1 287 LO_AND LogicalOperator = 2 288 LO_OR LogicalOperator = 3 289) 290 291type FilterType int32 292 293const ( 294 FT_SINGLE_COLUMN_VALUE FilterType = 1 295 FT_COMPOSITE_COLUMN_VALUE FilterType = 2 296 FT_COLUMN_PAGINATION FilterType = 3 297) 298 299type ColumnFilter interface { 300 Serialize() []byte 301 ToFilter() *otsprotocol.Filter 302} 303 304type VariantType int32 305 306const ( 307 Variant_INTEGER VariantType = 0; 308 Variant_DOUBLE VariantType = 1; 309 //VT_BOOLEAN = 2; 310 Variant_STRING VariantType = 3; 311) 312 313type ValueTransferRule struct { 314 Regex string 315 Cast_type VariantType 316} 317 318type SingleColumnCondition struct { 319 Comparator *ComparatorType 320 ColumnName *string 321 ColumnValue interface{} //[]byte 322 FilterIfMissing bool 323 LatestVersionOnly bool 324 TransferRule *ValueTransferRule 325} 326 327type ReturnType int32 328 329const ( 330 ReturnType_RT_NONE ReturnType = 0 331 ReturnType_RT_PK ReturnType = 1 332 ReturnType_RT_AFTER_MODIFY ReturnType = 2 333) 334 335type PaginationFilter struct { 336 Offset int32 337 Limit int32 338} 339 340type CompositeColumnValueFilter struct { 341 Operator LogicalOperator 342 Filters []ColumnFilter 343} 344 345func (ccvfilter *CompositeColumnValueFilter) Serialize() []byte { 346 result, _ := proto.Marshal(ccvfilter.ToFilter()) 347 return result 348} 349 350func (ccvfilter *CompositeColumnValueFilter) ToFilter() *otsprotocol.Filter { 351 compositefilter := NewCompositeFilter(ccvfilter.Filters, ccvfilter.Operator) 352 compositeFilterToBytes, _ := proto.Marshal(compositefilter) 353 filter := new(otsprotocol.Filter) 354 filter.Type = otsprotocol.FilterType_FT_COMPOSITE_COLUMN_VALUE.Enum() 355 filter.Filter = compositeFilterToBytes 356 return filter 357} 358 359func (ccvfilter *CompositeColumnValueFilter) AddFilter(filter ColumnFilter) { 360 ccvfilter.Filters = append(ccvfilter.Filters, filter) 361} 362 363func (condition *SingleColumnCondition) ToFilter() *otsprotocol.Filter { 364 singlefilter := NewSingleColumnValueFilter(condition) 365 singleFilterToBytes, _ := proto.Marshal(singlefilter) 366 filter := new(otsprotocol.Filter) 367 filter.Type = otsprotocol.FilterType_FT_SINGLE_COLUMN_VALUE.Enum() 368 filter.Filter = singleFilterToBytes 369 return filter 370} 371 372func (condition *SingleColumnCondition) Serialize() []byte { 373 result, _ := proto.Marshal(condition.ToFilter()) 374 return result 375} 376 377func (pageFilter *PaginationFilter) ToFilter() *otsprotocol.Filter { 378 compositefilter := NewPaginationFilter(pageFilter) 379 compositeFilterToBytes, _ := proto.Marshal(compositefilter) 380 filter := new(otsprotocol.Filter) 381 filter.Type = otsprotocol.FilterType_FT_COLUMN_PAGINATION.Enum() 382 filter.Filter = compositeFilterToBytes 383 return filter 384} 385 386func (pageFilter *PaginationFilter) Serialize() []byte { 387 result, _ := proto.Marshal(pageFilter.ToFilter()) 388 return result 389} 390 391func NewTableOptionWithMaxVersion(maxVersion int) *TableOption { 392 tableOption := new(TableOption) 393 tableOption.TimeToAlive = -1 394 tableOption.MaxVersion = maxVersion 395 return tableOption 396} 397 398func NewTableOption(timeToAlive int, maxVersion int) *TableOption { 399 tableOption := new(TableOption) 400 tableOption.TimeToAlive = timeToAlive 401 tableOption.MaxVersion = maxVersion 402 return tableOption 403} 404 405type RowCondition struct { 406 RowExistenceExpectation RowExistenceExpectation 407 ColumnCondition ColumnFilter 408} 409 410type PutRowChange struct { 411 TableName string 412 PrimaryKey *PrimaryKey 413 Columns []AttributeColumn 414 Condition *RowCondition 415 ReturnType ReturnType 416 TransactionId *string 417} 418 419type PutRowRequest struct { 420 PutRowChange *PutRowChange 421} 422 423type DeleteRowChange struct { 424 TableName string 425 PrimaryKey *PrimaryKey 426 Condition *RowCondition 427 TransactionId *string 428} 429 430type DeleteRowRequest struct { 431 DeleteRowChange *DeleteRowChange 432} 433 434type SingleRowQueryCriteria struct { 435 ColumnsToGet []string 436 TableName string 437 PrimaryKey *PrimaryKey 438 MaxVersion int32 439 TimeRange *TimeRange 440 Filter ColumnFilter 441 StartColumn *string 442 EndColumn *string 443 TransactionId *string 444} 445 446type UpdateRowChange struct { 447 TableName string 448 PrimaryKey *PrimaryKey 449 Columns []ColumnToUpdate 450 Condition *RowCondition 451 TransactionId *string 452 ReturnType ReturnType 453 ColumnNamesToReturn []string 454} 455 456type UpdateRowRequest struct { 457 UpdateRowChange *UpdateRowChange 458} 459 460func (rowQueryCriteria *SingleRowQueryCriteria) AddColumnToGet(columnName string) { 461 rowQueryCriteria.ColumnsToGet = append(rowQueryCriteria.ColumnsToGet, columnName) 462} 463 464func (rowQueryCriteria *SingleRowQueryCriteria) SetStartColumn(columnName string) { 465 rowQueryCriteria.StartColumn = &columnName 466} 467 468func (rowQueryCriteria *SingleRowQueryCriteria) SetEndtColumn(columnName string) { 469 rowQueryCriteria.EndColumn = &columnName 470} 471 472func (rowQueryCriteria *SingleRowQueryCriteria) getColumnsToGet() []string { 473 return rowQueryCriteria.ColumnsToGet 474} 475 476func (rowQueryCriteria *MultiRowQueryCriteria) AddColumnToGet(columnName string) { 477 rowQueryCriteria.ColumnsToGet = append(rowQueryCriteria.ColumnsToGet, columnName) 478} 479 480func (rowQueryCriteria *RangeRowQueryCriteria) AddColumnToGet(columnName string) { 481 rowQueryCriteria.ColumnsToGet = append(rowQueryCriteria.ColumnsToGet, columnName) 482} 483 484func (rowQueryCriteria *MultiRowQueryCriteria) AddRow(pk *PrimaryKey) { 485 rowQueryCriteria.PrimaryKey = append(rowQueryCriteria.PrimaryKey, pk) 486} 487 488type GetRowRequest struct { 489 SingleRowQueryCriteria *SingleRowQueryCriteria 490} 491 492type MultiRowQueryCriteria struct { 493 PrimaryKey []*PrimaryKey 494 ColumnsToGet []string 495 TableName string 496 MaxVersion int 497 TimeRange *TimeRange 498 Filter ColumnFilter 499 StartColumn *string 500 EndColumn *string 501} 502 503type BatchGetRowRequest struct { 504 MultiRowQueryCriteria []*MultiRowQueryCriteria 505} 506 507type ColumnMap struct { 508 Columns map[string][]*AttributeColumn 509 columnsKey []string 510} 511 512type GetRowResponse struct { 513 PrimaryKey PrimaryKey 514 Columns []*AttributeColumn 515 ConsumedCapacityUnit *ConsumedCapacityUnit 516 columnMap *ColumnMap 517 ResponseInfo 518} 519 520type Error struct { 521 Code string 522 Message string 523} 524 525type RowResult struct { 526 TableName string 527 IsSucceed bool 528 Error Error 529 PrimaryKey PrimaryKey 530 Columns []*AttributeColumn 531 ConsumedCapacityUnit *ConsumedCapacityUnit 532 Index int32 533} 534 535type RowChange interface { 536 Serialize() []byte 537 getOperationType() otsprotocol.OperationType 538 getCondition() *otsprotocol.Condition 539 GetTableName() string 540} 541 542type BatchGetRowResponse struct { 543 TableToRowsResult map[string][]RowResult 544 ResponseInfo 545} 546 547type BatchWriteRowRequest struct { 548 RowChangesGroupByTable map[string][]RowChange 549} 550 551type BatchWriteRowResponse struct { 552 TableToRowsResult map[string][]RowResult 553 ResponseInfo 554} 555 556type Direction int32 557 558const ( 559 FORWARD Direction = 0 560 BACKWARD Direction = 1 561) 562 563type RangeRowQueryCriteria struct { 564 TableName string 565 StartPrimaryKey *PrimaryKey 566 EndPrimaryKey *PrimaryKey 567 ColumnsToGet []string 568 MaxVersion int32 569 TimeRange *TimeRange 570 Filter ColumnFilter 571 Direction Direction 572 Limit int32 573 StartColumn *string 574 EndColumn *string 575 TransactionId *string 576} 577 578type GetRangeRequest struct { 579 RangeRowQueryCriteria *RangeRowQueryCriteria 580} 581 582type Row struct { 583 PrimaryKey *PrimaryKey 584 Columns []*AttributeColumn 585} 586 587type GetRangeResponse struct { 588 Rows []*Row 589 ConsumedCapacityUnit *ConsumedCapacityUnit 590 NextStartPrimaryKey *PrimaryKey 591 ResponseInfo 592} 593 594type ListStreamRequest struct { 595 TableName *string 596} 597 598type Stream struct { 599 Id *StreamId 600 TableName *string 601 CreationTime int64 602} 603 604type ListStreamResponse struct { 605 Streams []Stream 606 ResponseInfo 607} 608 609type StreamSpecification struct { 610 EnableStream bool 611 ExpirationTime int32 // must be positive. in hours 612} 613 614type StreamDetails struct { 615 EnableStream bool 616 StreamId *StreamId // nil when stream is disabled. 617 ExpirationTime int32 // in hours 618 LastEnableTime int64 // the last time stream is enabled, in usec 619} 620 621type DescribeStreamRequest struct { 622 StreamId *StreamId // required 623 InclusiveStartShardId *ShardId // optional 624 ShardLimit *int32 // optional 625} 626 627type DescribeStreamResponse struct { 628 StreamId *StreamId // required 629 ExpirationTime int32 // in hours 630 TableName *string // required 631 CreationTime int64 // in usec 632 Status StreamStatus // required 633 Shards []*StreamShard 634 NextShardId *ShardId // optional. nil means "no more shards" 635 ResponseInfo 636} 637 638type GetShardIteratorRequest struct { 639 StreamId *StreamId // required 640 ShardId *ShardId // required 641 Timestamp *int64 642 Token *string 643} 644 645type GetShardIteratorResponse struct { 646 ShardIterator *ShardIterator // required 647 Token *string 648 ResponseInfo 649} 650 651type GetStreamRecordRequest struct { 652 ShardIterator *ShardIterator // required 653 Limit *int32 // optional. max records which will reside in response 654} 655 656type GetStreamRecordResponse struct { 657 Records []*StreamRecord 658 NextShardIterator *ShardIterator // optional. an indicator to be used to read more records in this shard 659 ResponseInfo 660} 661 662type ComputeSplitPointsBySizeRequest struct { 663 TableName string 664 SplitSize int64 665} 666 667type ComputeSplitPointsBySizeResponse struct { 668 SchemaEntry []*PrimaryKeySchema 669 Splits []*Split 670 ResponseInfo 671} 672 673type Split struct { 674 LowerBound *PrimaryKey 675 UpperBound *PrimaryKey 676 Location string 677} 678 679type StreamId string 680type ShardId string 681type ShardIterator string 682type StreamStatus int 683 684const ( 685 SS_Enabling StreamStatus = iota 686 SS_Active 687) 688 689/* 690 * Shards are possibly splitted into two or merged from two. 691 * After splitting, both newly generated shards have the same FatherShard. 692 * After merging, the newly generated shard have both FatherShard and MotherShard. 693 */ 694type StreamShard struct { 695 SelfShard *ShardId // required 696 FatherShard *ShardId // optional 697 MotherShard *ShardId // optional 698} 699 700type StreamRecord struct { 701 Type ActionType 702 Info *RecordSequenceInfo // required 703 PrimaryKey *PrimaryKey // required 704 Columns []*RecordColumn 705} 706 707func (this *StreamRecord) String() string { 708 return fmt.Sprintf( 709 "{\"Type\":%s, \"PrimaryKey\":%s, \"Info\":%s, \"Columns\":%s}", 710 this.Type, 711 *this.PrimaryKey, 712 this.Info, 713 this.Columns) 714} 715 716type ActionType int 717 718const ( 719 AT_Put ActionType = iota 720 AT_Update 721 AT_Delete 722) 723 724func (this ActionType) String() string { 725 switch this { 726 case AT_Put: 727 return "\"PutRow\"" 728 case AT_Update: 729 return "\"UpdateRow\"" 730 case AT_Delete: 731 return "\"DeleteRow\"" 732 default: 733 panic(fmt.Sprintf("unknown action type: %d", int(this))) 734 } 735} 736 737type RecordSequenceInfo struct { 738 Epoch int32 739 Timestamp int64 740 RowIndex int32 741} 742 743func (this *RecordSequenceInfo) String() string { 744 return fmt.Sprintf( 745 "{\"Epoch\":%d, \"Timestamp\": %d, \"RowIndex\": %d}", 746 this.Epoch, 747 this.Timestamp, 748 this.RowIndex) 749} 750 751type RecordColumn struct { 752 Type RecordColumnType 753 Name *string // required 754 Value interface{} // optional. present when Type is RCT_Put 755 Timestamp *int64 // optional, in msec. present when Type is RCT_Put or RCT_DeleteOneVersion 756} 757 758func (this *RecordColumn) String() string { 759 xs := make([]string, 0) 760 xs = append(xs, fmt.Sprintf("\"Name\":%s", strconv.Quote(*this.Name))) 761 switch this.Type { 762 case RCT_DeleteAllVersions: 763 xs = append(xs, "\"Type\":\"DeleteAllVersions\"") 764 case RCT_DeleteOneVersion: 765 xs = append(xs, "\"Type\":\"DeleteOneVersion\"") 766 xs = append(xs, fmt.Sprintf("\"Timestamp\":%d", *this.Timestamp)) 767 case RCT_Put: 768 xs = append(xs, "\"Type\":\"Put\"") 769 xs = append(xs, fmt.Sprintf("\"Timestamp\":%d", *this.Timestamp)) 770 xs = append(xs, fmt.Sprintf("\"Value\":%s", this.Value)) 771 } 772 return fmt.Sprintf("{%s}", strings.Join(xs, ", ")) 773} 774 775type RecordColumnType int 776 777const ( 778 RCT_Put RecordColumnType = iota 779 RCT_DeleteOneVersion 780 RCT_DeleteAllVersions 781) 782 783type IndexMeta struct { 784 IndexName string 785 Primarykey []string 786 DefinedColumns []string 787 IndexType IndexType 788} 789 790type DefinedColumnSchema struct { 791 Name string 792 ColumnType DefinedColumnType 793} 794 795type IndexType int32 796 797const ( 798 IT_GLOBAL_INDEX IndexType = 1 799 IT_LOCAL_INDEX IndexType = 2 800) 801 802type DefinedColumnType int32 803 804const ( 805 /** 806 * 64位整数。 807 */ 808 DefinedColumn_INTEGER DefinedColumnType = 1 809 810 /** 811 * 浮点数。 812 */ 813 DefinedColumn_DOUBLE DefinedColumnType = 2 814 815 /** 816 * 布尔值。 817 */ 818 DefinedColumn_BOOLEAN DefinedColumnType = 3 819 820 /** 821 * 字符串。 822 */ 823 DefinedColumn_STRING DefinedColumnType = 4 824 825 /** 826 * BINARY。 827 */ 828 DefinedColumn_BINARY DefinedColumnType = 5 829) 830 831type StartLocalTransactionRequest struct { 832 PrimaryKey *PrimaryKey 833 TableName string 834} 835 836type StartLocalTransactionResponse struct { 837 TransactionId *string 838 ResponseInfo 839} 840 841type CommitTransactionRequest struct { 842 TransactionId *string 843} 844 845type CommitTransactionResponse struct { 846 ResponseInfo 847} 848 849type AbortTransactionRequest struct { 850 TransactionId *string 851} 852 853type AbortTransactionResponse struct { 854 ResponseInfo 855}