1package tablestore
2
3import (
4	"fmt"
5	"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol"
6	"github.com/golang/protobuf/proto"
7	"math/rand"
8	"net/http"
9	"strconv"
10	"strings"
11	"time"
12	//"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore"
13)
14
15// @class TableStoreClient
16// The TableStoreClient, which will connect OTS service for authorization, create/list/
17// delete tables/table groups, to get/put/delete a row.
18// Note: TableStoreClient is thread-safe.
19// TableStoreClient的功能包括连接OTS服务进行验证、创建/列出/删除表或表组、插入/获取/
20// 删除/更新行数据
21type TableStoreClient struct {
22	endPoint        string
23	instanceName    string
24	accessKeyId     string
25	accessKeySecret string
26	securityToken   string
27
28	httpClient      IHttpClient
29	config          *TableStoreConfig
30	random          *rand.Rand
31}
32
33type ClientOption func(*TableStoreClient)
34
35type TableStoreHttpClient struct {
36	httpClient *http.Client
37}
38
39// use this to mock http.client for testing
40type IHttpClient interface {
41	Do(*http.Request) (*http.Response, error)
42	New(*http.Client)
43}
44
45func (httpClient *TableStoreHttpClient) Do(req *http.Request) (*http.Response, error) {
46	return httpClient.httpClient.Do(req)
47}
48
49func (httpClient *TableStoreHttpClient) New(client *http.Client) {
50	httpClient.httpClient = client
51}
52
53type HTTPTimeout struct {
54	ConnectionTimeout time.Duration
55	RequestTimeout    time.Duration
56}
57
58type TableStoreConfig struct {
59	RetryTimes         uint
60	MaxRetryTime       time.Duration
61	HTTPTimeout        HTTPTimeout
62	MaxIdleConnections int
63}
64
65func NewDefaultTableStoreConfig() *TableStoreConfig {
66	httpTimeout := &HTTPTimeout{
67		ConnectionTimeout: time.Second * 15,
68		RequestTimeout:    time.Second * 30}
69	config := &TableStoreConfig{
70		RetryTimes:         10,
71		HTTPTimeout:        *httpTimeout,
72		MaxRetryTime:       time.Second * 5,
73		MaxIdleConnections: 2000}
74	return config
75}
76
77type CreateTableRequest struct {
78	TableMeta          *TableMeta
79	TableOption        *TableOption
80	ReservedThroughput *ReservedThroughput
81	StreamSpec         *StreamSpecification
82	IndexMetas         []*IndexMeta
83}
84
85type CreateIndexRequest struct {
86	MainTableName   string
87	IndexMeta       *IndexMeta
88	IncludeBaseData bool
89}
90
91type DeleteIndexRequest struct {
92	MainTableName string
93	IndexName     string
94}
95
96type ResponseInfo struct {
97	RequestId string
98}
99
100type CreateTableResponse struct {
101	ResponseInfo
102}
103
104type CreateIndexResponse struct {
105	ResponseInfo
106}
107
108type DeleteIndexResponse struct {
109	ResponseInfo
110}
111
112type DeleteTableResponse struct {
113	ResponseInfo
114}
115
116type TableMeta struct {
117	TableName      string
118	SchemaEntry    []*PrimaryKeySchema
119	DefinedColumns []*DefinedColumnSchema
120}
121
122type PrimaryKeySchema struct {
123	Name   *string
124	Type   *PrimaryKeyType
125	Option *PrimaryKeyOption
126}
127
128type PrimaryKey struct {
129	PrimaryKeys []*PrimaryKeyColumn
130}
131
132type TableOption struct {
133	TimeToAlive, MaxVersion int
134}
135
136type ReservedThroughput struct {
137	Readcap, Writecap int
138}
139
140type ListTableResponse struct {
141	TableNames []string
142	ResponseInfo
143}
144
145type DeleteTableRequest struct {
146	TableName string
147}
148
149type DescribeTableRequest struct {
150	TableName string
151}
152
153type DescribeTableResponse struct {
154	TableMeta          *TableMeta
155	TableOption        *TableOption
156	ReservedThroughput *ReservedThroughput
157	StreamDetails      *StreamDetails
158	IndexMetas         []*IndexMeta
159	ResponseInfo
160}
161
162type UpdateTableRequest struct {
163	TableName          string
164	TableOption        *TableOption
165	ReservedThroughput *ReservedThroughput
166	StreamSpec         *StreamSpecification
167}
168
169type UpdateTableResponse struct {
170	TableOption        *TableOption
171	ReservedThroughput *ReservedThroughput
172	StreamDetails      *StreamDetails
173	ResponseInfo
174}
175
176type ConsumedCapacityUnit struct {
177	Read  int32
178	Write int32
179}
180
181type PutRowResponse struct {
182	ConsumedCapacityUnit *ConsumedCapacityUnit
183	PrimaryKey           PrimaryKey
184	ResponseInfo
185}
186
187type DeleteRowResponse struct {
188	ConsumedCapacityUnit *ConsumedCapacityUnit
189	ResponseInfo
190}
191
192type UpdateRowResponse struct {
193	Columns              []*AttributeColumn
194	ConsumedCapacityUnit *ConsumedCapacityUnit
195	ResponseInfo
196}
197
198type PrimaryKeyType int32
199
200const (
201	PrimaryKeyType_INTEGER PrimaryKeyType = 1
202	PrimaryKeyType_STRING PrimaryKeyType = 2
203	PrimaryKeyType_BINARY PrimaryKeyType = 3
204)
205
206const (
207	DefaultRetryInterval = 10
208	MaxRetryInterval = 320
209)
210
211type PrimaryKeyOption int32
212
213const (
214	NONE PrimaryKeyOption = 0
215	AUTO_INCREMENT PrimaryKeyOption = 1
216	MIN PrimaryKeyOption = 2
217	MAX PrimaryKeyOption = 3
218)
219
220type PrimaryKeyColumn struct {
221	ColumnName       string
222	Value            interface{}
223	PrimaryKeyOption PrimaryKeyOption
224}
225
226func (this *PrimaryKeyColumn) String() string {
227	xs := make([]string, 0)
228	xs = append(xs, fmt.Sprintf("\"Name\": \"%s\"", this.ColumnName))
229	switch this.PrimaryKeyOption {
230	case NONE:
231		xs = append(xs, fmt.Sprintf("\"Value\": \"%s\"", this.Value))
232	case MIN:
233		xs = append(xs, "\"Value\": -inf")
234	case MAX:
235		xs = append(xs, "\"Value\": +inf")
236	case AUTO_INCREMENT:
237		xs = append(xs, "\"Value\": auto-incr")
238	}
239	return fmt.Sprintf("{%s}", strings.Join(xs, ", "))
240}
241
242type AttributeColumn struct {
243	ColumnName string
244	Value      interface{}
245	Timestamp  int64
246}
247
248type TimeRange struct {
249	Start    int64
250	End      int64
251	Specific int64
252}
253
254type ColumnToUpdate struct {
255	ColumnName   string
256	Type         byte
257	Timestamp    int64
258	HasType      bool
259	HasTimestamp bool
260	IgnoreValue  bool
261	Value        interface{}
262}
263
264type RowExistenceExpectation int
265
266const (
267	RowExistenceExpectation_IGNORE RowExistenceExpectation = 0
268	RowExistenceExpectation_EXPECT_EXIST RowExistenceExpectation = 1
269	RowExistenceExpectation_EXPECT_NOT_EXIST RowExistenceExpectation = 2
270)
271
272type ComparatorType int32
273
274const (
275	CT_EQUAL ComparatorType = 1
276	CT_NOT_EQUAL ComparatorType = 2
277	CT_GREATER_THAN ComparatorType = 3
278	CT_GREATER_EQUAL ComparatorType = 4
279	CT_LESS_THAN ComparatorType = 5
280	CT_LESS_EQUAL ComparatorType = 6
281)
282
283type LogicalOperator int32
284
285const (
286	LO_NOT LogicalOperator = 1
287	LO_AND LogicalOperator = 2
288	LO_OR LogicalOperator = 3
289)
290
291type FilterType int32
292
293const (
294	FT_SINGLE_COLUMN_VALUE FilterType = 1
295	FT_COMPOSITE_COLUMN_VALUE FilterType = 2
296	FT_COLUMN_PAGINATION FilterType = 3
297)
298
299type ColumnFilter interface {
300	Serialize() []byte
301	ToFilter() *otsprotocol.Filter
302}
303
304type VariantType int32
305
306const (
307	Variant_INTEGER VariantType = 0;
308	Variant_DOUBLE VariantType = 1;
309	//VT_BOOLEAN = 2;
310	Variant_STRING VariantType = 3;
311)
312
313type ValueTransferRule struct {
314	Regex     string
315	Cast_type VariantType
316}
317
318type SingleColumnCondition struct {
319	Comparator        *ComparatorType
320	ColumnName        *string
321	ColumnValue       interface{} //[]byte
322	FilterIfMissing   bool
323	LatestVersionOnly bool
324	TransferRule      *ValueTransferRule
325}
326
327type ReturnType int32
328
329const (
330	ReturnType_RT_NONE ReturnType = 0
331	ReturnType_RT_PK ReturnType = 1
332	ReturnType_RT_AFTER_MODIFY ReturnType = 2
333)
334
335type PaginationFilter struct {
336	Offset int32
337	Limit  int32
338}
339
340type CompositeColumnValueFilter struct {
341	Operator LogicalOperator
342	Filters  []ColumnFilter
343}
344
345func (ccvfilter *CompositeColumnValueFilter) Serialize() []byte {
346	result, _ := proto.Marshal(ccvfilter.ToFilter())
347	return result
348}
349
350func (ccvfilter *CompositeColumnValueFilter) ToFilter() *otsprotocol.Filter {
351	compositefilter := NewCompositeFilter(ccvfilter.Filters, ccvfilter.Operator)
352	compositeFilterToBytes, _ := proto.Marshal(compositefilter)
353	filter := new(otsprotocol.Filter)
354	filter.Type = otsprotocol.FilterType_FT_COMPOSITE_COLUMN_VALUE.Enum()
355	filter.Filter = compositeFilterToBytes
356	return filter
357}
358
359func (ccvfilter *CompositeColumnValueFilter) AddFilter(filter ColumnFilter) {
360	ccvfilter.Filters = append(ccvfilter.Filters, filter)
361}
362
363func (condition *SingleColumnCondition) ToFilter() *otsprotocol.Filter {
364	singlefilter := NewSingleColumnValueFilter(condition)
365	singleFilterToBytes, _ := proto.Marshal(singlefilter)
366	filter := new(otsprotocol.Filter)
367	filter.Type = otsprotocol.FilterType_FT_SINGLE_COLUMN_VALUE.Enum()
368	filter.Filter = singleFilterToBytes
369	return filter
370}
371
372func (condition *SingleColumnCondition) Serialize() []byte {
373	result, _ := proto.Marshal(condition.ToFilter())
374	return result
375}
376
377func (pageFilter *PaginationFilter) ToFilter() *otsprotocol.Filter {
378	compositefilter := NewPaginationFilter(pageFilter)
379	compositeFilterToBytes, _ := proto.Marshal(compositefilter)
380	filter := new(otsprotocol.Filter)
381	filter.Type = otsprotocol.FilterType_FT_COLUMN_PAGINATION.Enum()
382	filter.Filter = compositeFilterToBytes
383	return filter
384}
385
386func (pageFilter *PaginationFilter) Serialize() []byte {
387	result, _ := proto.Marshal(pageFilter.ToFilter())
388	return result
389}
390
391func NewTableOptionWithMaxVersion(maxVersion int) *TableOption {
392	tableOption := new(TableOption)
393	tableOption.TimeToAlive = -1
394	tableOption.MaxVersion = maxVersion
395	return tableOption
396}
397
398func NewTableOption(timeToAlive int, maxVersion int) *TableOption {
399	tableOption := new(TableOption)
400	tableOption.TimeToAlive = timeToAlive
401	tableOption.MaxVersion = maxVersion
402	return tableOption
403}
404
405type RowCondition struct {
406	RowExistenceExpectation RowExistenceExpectation
407	ColumnCondition         ColumnFilter
408}
409
410type PutRowChange struct {
411	TableName  string
412	PrimaryKey *PrimaryKey
413	Columns    []AttributeColumn
414	Condition  *RowCondition
415	ReturnType ReturnType
416	TransactionId    *string
417}
418
419type PutRowRequest struct {
420	PutRowChange *PutRowChange
421}
422
423type DeleteRowChange struct {
424	TableName  string
425	PrimaryKey *PrimaryKey
426	Condition  *RowCondition
427	TransactionId *string
428}
429
430type DeleteRowRequest struct {
431	DeleteRowChange *DeleteRowChange
432}
433
434type SingleRowQueryCriteria struct {
435	ColumnsToGet []string
436	TableName    string
437	PrimaryKey   *PrimaryKey
438	MaxVersion   int32
439	TimeRange    *TimeRange
440	Filter       ColumnFilter
441	StartColumn  *string
442	EndColumn    *string
443	TransactionId *string
444}
445
446type UpdateRowChange struct {
447	TableName  string
448	PrimaryKey *PrimaryKey
449	Columns    []ColumnToUpdate
450	Condition  *RowCondition
451	TransactionId *string
452	ReturnType ReturnType
453	ColumnNamesToReturn    []string
454}
455
456type UpdateRowRequest struct {
457	UpdateRowChange *UpdateRowChange
458}
459
460func (rowQueryCriteria *SingleRowQueryCriteria) AddColumnToGet(columnName string) {
461	rowQueryCriteria.ColumnsToGet = append(rowQueryCriteria.ColumnsToGet, columnName)
462}
463
464func (rowQueryCriteria *SingleRowQueryCriteria) SetStartColumn(columnName string) {
465	rowQueryCriteria.StartColumn = &columnName
466}
467
468func (rowQueryCriteria *SingleRowQueryCriteria) SetEndtColumn(columnName string) {
469	rowQueryCriteria.EndColumn = &columnName
470}
471
472func (rowQueryCriteria *SingleRowQueryCriteria) getColumnsToGet() []string {
473	return rowQueryCriteria.ColumnsToGet
474}
475
476func (rowQueryCriteria *MultiRowQueryCriteria) AddColumnToGet(columnName string) {
477	rowQueryCriteria.ColumnsToGet = append(rowQueryCriteria.ColumnsToGet, columnName)
478}
479
480func (rowQueryCriteria *RangeRowQueryCriteria) AddColumnToGet(columnName string) {
481	rowQueryCriteria.ColumnsToGet = append(rowQueryCriteria.ColumnsToGet, columnName)
482}
483
484func (rowQueryCriteria *MultiRowQueryCriteria) AddRow(pk *PrimaryKey) {
485	rowQueryCriteria.PrimaryKey = append(rowQueryCriteria.PrimaryKey, pk)
486}
487
488type GetRowRequest struct {
489	SingleRowQueryCriteria *SingleRowQueryCriteria
490}
491
492type MultiRowQueryCriteria struct {
493	PrimaryKey   []*PrimaryKey
494	ColumnsToGet []string
495	TableName    string
496	MaxVersion   int
497	TimeRange    *TimeRange
498	Filter       ColumnFilter
499	StartColumn  *string
500	EndColumn    *string
501}
502
503type BatchGetRowRequest struct {
504	MultiRowQueryCriteria []*MultiRowQueryCriteria
505}
506
507type ColumnMap struct {
508	Columns    map[string][]*AttributeColumn
509	columnsKey []string
510}
511
512type GetRowResponse struct {
513	PrimaryKey           PrimaryKey
514	Columns              []*AttributeColumn
515	ConsumedCapacityUnit *ConsumedCapacityUnit
516	columnMap            *ColumnMap
517	ResponseInfo
518}
519
520type Error struct {
521	Code    string
522	Message string
523}
524
525type RowResult struct {
526	TableName            string
527	IsSucceed            bool
528	Error                Error
529	PrimaryKey           PrimaryKey
530	Columns              []*AttributeColumn
531	ConsumedCapacityUnit *ConsumedCapacityUnit
532	Index                int32
533}
534
535type RowChange interface {
536	Serialize() []byte
537	getOperationType() otsprotocol.OperationType
538	getCondition() *otsprotocol.Condition
539	GetTableName() string
540}
541
542type BatchGetRowResponse struct {
543	TableToRowsResult map[string][]RowResult
544	ResponseInfo
545}
546
547type BatchWriteRowRequest struct {
548	RowChangesGroupByTable map[string][]RowChange
549}
550
551type BatchWriteRowResponse struct {
552	TableToRowsResult map[string][]RowResult
553	ResponseInfo
554}
555
556type Direction int32
557
558const (
559	FORWARD Direction = 0
560	BACKWARD Direction = 1
561)
562
563type RangeRowQueryCriteria struct {
564	TableName       string
565	StartPrimaryKey *PrimaryKey
566	EndPrimaryKey   *PrimaryKey
567	ColumnsToGet    []string
568	MaxVersion      int32
569	TimeRange       *TimeRange
570	Filter          ColumnFilter
571	Direction       Direction
572	Limit           int32
573	StartColumn     *string
574	EndColumn       *string
575	TransactionId    *string
576}
577
578type GetRangeRequest struct {
579	RangeRowQueryCriteria *RangeRowQueryCriteria
580}
581
582type Row struct {
583	PrimaryKey *PrimaryKey
584	Columns    []*AttributeColumn
585}
586
587type GetRangeResponse struct {
588	Rows                 []*Row
589	ConsumedCapacityUnit *ConsumedCapacityUnit
590	NextStartPrimaryKey  *PrimaryKey
591	ResponseInfo
592}
593
594type ListStreamRequest struct {
595	TableName *string
596}
597
598type Stream struct {
599	Id           *StreamId
600	TableName    *string
601	CreationTime int64
602}
603
604type ListStreamResponse struct {
605	Streams []Stream
606	ResponseInfo
607}
608
609type StreamSpecification struct {
610	EnableStream   bool
611	ExpirationTime int32 // must be positive. in hours
612}
613
614type StreamDetails struct {
615	EnableStream   bool
616	StreamId       *StreamId // nil when stream is disabled.
617	ExpirationTime int32     // in hours
618	LastEnableTime int64     // the last time stream is enabled, in usec
619}
620
621type DescribeStreamRequest struct {
622	StreamId              *StreamId // required
623	InclusiveStartShardId *ShardId  // optional
624	ShardLimit            *int32    // optional
625}
626
627type DescribeStreamResponse struct {
628	StreamId       *StreamId    // required
629	ExpirationTime int32        // in hours
630	TableName      *string      // required
631	CreationTime   int64        // in usec
632	Status         StreamStatus // required
633	Shards         []*StreamShard
634	NextShardId    *ShardId     // optional. nil means "no more shards"
635	ResponseInfo
636}
637
638type GetShardIteratorRequest struct {
639	StreamId  *StreamId // required
640	ShardId   *ShardId  // required
641	Timestamp *int64
642	Token     *string
643}
644
645type GetShardIteratorResponse struct {
646	ShardIterator *ShardIterator // required
647	Token         *string
648	ResponseInfo
649}
650
651type GetStreamRecordRequest struct {
652	ShardIterator *ShardIterator // required
653	Limit         *int32         // optional. max records which will reside in response
654}
655
656type GetStreamRecordResponse struct {
657	Records           []*StreamRecord
658	NextShardIterator *ShardIterator // optional. an indicator to be used to read more records in this shard
659	ResponseInfo
660}
661
662type ComputeSplitPointsBySizeRequest struct {
663	TableName string
664	SplitSize int64
665}
666
667type ComputeSplitPointsBySizeResponse struct {
668	SchemaEntry []*PrimaryKeySchema
669	Splits      []*Split
670	ResponseInfo
671}
672
673type Split struct {
674	LowerBound *PrimaryKey
675	UpperBound *PrimaryKey
676	Location   string
677}
678
679type StreamId string
680type ShardId string
681type ShardIterator string
682type StreamStatus int
683
684const (
685	SS_Enabling StreamStatus = iota
686	SS_Active
687)
688
689/*
690 * Shards are possibly splitted into two or merged from two.
691 * After splitting, both newly generated shards have the same FatherShard.
692 * After merging, the newly generated shard have both FatherShard and MotherShard.
693 */
694type StreamShard struct {
695	SelfShard   *ShardId // required
696	FatherShard *ShardId // optional
697	MotherShard *ShardId // optional
698}
699
700type StreamRecord struct {
701	Type       ActionType
702	Info       *RecordSequenceInfo // required
703	PrimaryKey *PrimaryKey         // required
704	Columns    []*RecordColumn
705}
706
707func (this *StreamRecord) String() string {
708	return fmt.Sprintf(
709		"{\"Type\":%s, \"PrimaryKey\":%s, \"Info\":%s, \"Columns\":%s}",
710		this.Type,
711		*this.PrimaryKey,
712		this.Info,
713		this.Columns)
714}
715
716type ActionType int
717
718const (
719	AT_Put ActionType = iota
720	AT_Update
721	AT_Delete
722)
723
724func (this ActionType) String() string {
725	switch this {
726	case AT_Put:
727		return "\"PutRow\""
728	case AT_Update:
729		return "\"UpdateRow\""
730	case AT_Delete:
731		return "\"DeleteRow\""
732	default:
733		panic(fmt.Sprintf("unknown action type: %d", int(this)))
734	}
735}
736
737type RecordSequenceInfo struct {
738	Epoch     int32
739	Timestamp int64
740	RowIndex  int32
741}
742
743func (this *RecordSequenceInfo) String() string {
744	return fmt.Sprintf(
745		"{\"Epoch\":%d, \"Timestamp\": %d, \"RowIndex\": %d}",
746		this.Epoch,
747		this.Timestamp,
748		this.RowIndex)
749}
750
751type RecordColumn struct {
752	Type      RecordColumnType
753	Name      *string     // required
754	Value     interface{} // optional. present when Type is RCT_Put
755	Timestamp *int64      // optional, in msec. present when Type is RCT_Put or RCT_DeleteOneVersion
756}
757
758func (this *RecordColumn) String() string {
759	xs := make([]string, 0)
760	xs = append(xs, fmt.Sprintf("\"Name\":%s", strconv.Quote(*this.Name)))
761	switch this.Type {
762	case RCT_DeleteAllVersions:
763		xs = append(xs, "\"Type\":\"DeleteAllVersions\"")
764	case RCT_DeleteOneVersion:
765		xs = append(xs, "\"Type\":\"DeleteOneVersion\"")
766		xs = append(xs, fmt.Sprintf("\"Timestamp\":%d", *this.Timestamp))
767	case RCT_Put:
768		xs = append(xs, "\"Type\":\"Put\"")
769		xs = append(xs, fmt.Sprintf("\"Timestamp\":%d", *this.Timestamp))
770		xs = append(xs, fmt.Sprintf("\"Value\":%s", this.Value))
771	}
772	return fmt.Sprintf("{%s}", strings.Join(xs, ", "))
773}
774
775type RecordColumnType int
776
777const (
778	RCT_Put RecordColumnType = iota
779	RCT_DeleteOneVersion
780	RCT_DeleteAllVersions
781)
782
783type IndexMeta struct {
784	IndexName      string
785	Primarykey     []string
786	DefinedColumns []string
787	IndexType      IndexType
788}
789
790type DefinedColumnSchema struct {
791	Name       string
792	ColumnType DefinedColumnType
793}
794
795type IndexType int32
796
797const (
798	IT_GLOBAL_INDEX IndexType = 1
799	IT_LOCAL_INDEX IndexType = 2
800)
801
802type DefinedColumnType int32
803
804const (
805	/**
806	 * 64位整数。
807	 */
808	DefinedColumn_INTEGER DefinedColumnType = 1
809
810	/**
811	 * 浮点数。
812	 */
813	DefinedColumn_DOUBLE DefinedColumnType = 2
814
815	/**
816	 * 布尔值。
817	 */
818	DefinedColumn_BOOLEAN DefinedColumnType = 3
819
820	/**
821	 * 字符串。
822	 */
823	DefinedColumn_STRING DefinedColumnType = 4
824
825	/**
826	 * BINARY。
827	 */
828	DefinedColumn_BINARY DefinedColumnType = 5
829)
830
831type StartLocalTransactionRequest struct {
832	PrimaryKey *PrimaryKey
833	TableName string
834}
835
836type StartLocalTransactionResponse struct {
837	TransactionId    *string
838	ResponseInfo
839}
840
841type CommitTransactionRequest struct {
842	TransactionId    *string
843}
844
845type CommitTransactionResponse struct {
846	ResponseInfo
847}
848
849type AbortTransactionRequest struct {
850	TransactionId    *string
851}
852
853type AbortTransactionResponse struct {
854	ResponseInfo
855}