1package tablestore
2
3import (
4	"bytes"
5	"errors"
6	"fmt"
7
8	"github.com/aliyun/aliyun-tablestore-go-sdk/tablestore/otsprotocol"
9	"github.com/golang/protobuf/proto"
10)
11
12func (tableStoreClient *TableStoreClient) CreateSearchIndex(request *CreateSearchIndexRequest) (*CreateSearchIndexResponse, error) {
13	req := new(otsprotocol.CreateSearchIndexRequest)
14	req.TableName = proto.String(request.TableName)
15	req.IndexName = proto.String(request.IndexName)
16	var err error
17	req.Schema, err = convertToPbSchema(request.IndexSchema)
18	if err != nil {
19		return nil, err
20	}
21	resp := new(otsprotocol.CreateSearchIndexRequest)
22	response := &CreateSearchIndexResponse{}
23	if err := tableStoreClient.doRequestWithRetry(createSearchIndexUri, req, resp, &response.ResponseInfo); err != nil {
24		return nil, err
25	}
26	return response, nil
27}
28
29func (tableStoreClient *TableStoreClient) DeleteSearchIndex(request *DeleteSearchIndexRequest) (*DeleteSearchIndexResponse, error) {
30	req := new(otsprotocol.DeleteSearchIndexRequest)
31	req.TableName = proto.String(request.TableName)
32	req.IndexName = proto.String(request.IndexName)
33
34	resp := new(otsprotocol.DeleteSearchIndexResponse)
35	response := &DeleteSearchIndexResponse{}
36	if err := tableStoreClient.doRequestWithRetry(deleteSearchIndexUri, req, resp, &response.ResponseInfo); err != nil {
37		return nil, err
38	}
39	return response, nil
40}
41
42func (tableStoreClient *TableStoreClient) ListSearchIndex(request *ListSearchIndexRequest) (*ListSearchIndexResponse, error) {
43	req := new(otsprotocol.ListSearchIndexRequest)
44	req.TableName = proto.String(request.TableName)
45
46	resp := new(otsprotocol.ListSearchIndexResponse)
47	response := &ListSearchIndexResponse{}
48	if err := tableStoreClient.doRequestWithRetry(listSearchIndexUri, req, resp, &response.ResponseInfo); err != nil {
49		return nil, err
50	}
51	indexs := make([]*IndexInfo, 0)
52	for _, info := range resp.Indices {
53		indexs = append(indexs, &IndexInfo{
54			TableName: *info.TableName,
55			IndexName: *info.IndexName,
56		})
57	}
58	response.IndexInfo = indexs
59	return response, nil
60}
61
62func (tableStoreClient *TableStoreClient) DescribeSearchIndex(request *DescribeSearchIndexRequest) (*DescribeSearchIndexResponse, error) {
63	req := new(otsprotocol.DescribeSearchIndexRequest)
64	req.TableName = proto.String(request.TableName)
65	req.IndexName = proto.String(request.IndexName)
66
67	resp := new(otsprotocol.DescribeSearchIndexResponse)
68	response := &DescribeSearchIndexResponse{}
69	if err := tableStoreClient.doRequestWithRetry(describeSearchIndexUri, req, resp, &response.ResponseInfo); err != nil {
70		return nil, err
71	}
72	schema, err := parseFromPbSchema(resp.Schema)
73	if err != nil {
74		return nil, err
75	}
76	response.Schema = schema
77	if resp.SyncStat != nil {
78		response.SyncStat = &SyncStat{
79			CurrentSyncTimestamp: resp.SyncStat.CurrentSyncTimestamp,
80		}
81		syncPhase := resp.SyncStat.SyncPhase
82		if syncPhase == nil {
83			return nil, errors.New("missing [SyncPhase] in DescribeSearchIndexResponse")
84		} else if *syncPhase == otsprotocol.SyncPhase_FULL {
85			response.SyncStat.SyncPhase = SyncPhase_FULL
86		} else if *syncPhase == otsprotocol.SyncPhase_INCR {
87			response.SyncStat.SyncPhase = SyncPhase_INCR
88		} else {
89			return nil, errors.New(fmt.Sprintf("unknown SyncPhase: %v", syncPhase))
90		}
91	}
92	return response, nil
93}
94
95func (tableStoreClient *TableStoreClient) Search(request *SearchRequest) (*SearchResponse, error) {
96	req, err := request.ProtoBuffer()
97	if err != nil {
98		return nil, err
99	}
100	resp := new(otsprotocol.SearchResponse)
101	response := &SearchResponse{}
102	if err := tableStoreClient.doRequestWithRetry(searchUri, req, resp, &response.ResponseInfo); err != nil {
103		return nil, err
104	}
105	response.TotalCount = *resp.TotalHits
106
107	rows := make([]*PlainBufferRow, 0)
108	for _, buf := range resp.Rows {
109		row, err := readRowsWithHeader(bytes.NewReader(buf))
110		if err != nil {
111			return nil, err
112		}
113		rows = append(rows, row[0])
114	}
115
116	for _, row := range rows {
117		currentRow := &Row{}
118		currentPk := new(PrimaryKey)
119		for _, pk := range row.primaryKey {
120			pkColumn := &PrimaryKeyColumn{ColumnName: string(pk.cellName), Value: pk.cellValue.Value}
121			currentPk.PrimaryKeys = append(currentPk.PrimaryKeys, pkColumn)
122		}
123		currentRow.PrimaryKey = currentPk
124		for _, cell := range row.cells {
125			dataColumn := &AttributeColumn{ColumnName: string(cell.cellName), Value: cell.cellValue.Value, Timestamp: cell.cellTimestamp}
126			currentRow.Columns = append(currentRow.Columns, dataColumn)
127		}
128		response.Rows = append(response.Rows, currentRow)
129	}
130
131	response.IsAllSuccess = *resp.IsAllSucceeded
132	if resp.NextToken != nil && len(resp.NextToken) > 0 {
133		response.NextToken = resp.NextToken
134	}
135	return response, nil
136}
137