1package tablestore
2
3import (
4	"fmt"
5	. "gopkg.in/check.v1"
6	"math/rand"
7	"net/http"
8	"os"
9	"runtime"
10	"strconv"
11	"strings"
12	"testing"
13	"time"
14	"io"
15	"syscall"
16)
17
18// Hook up gocheck into the "go test" runner.
19func Test(t *testing.T) {
20	TestingT(t)
21}
22
23type TableStoreSuite struct{}
24
25var tableNamePrefix string
26
27var _ = Suite(&TableStoreSuite{})
28
29var defaultTableName = "defaulttable"
30var rangeQueryTableName = "rangetable"
31
32// Todo: use config
33var client TableStoreApi
34var invalidClient TableStoreApi
35
36func (s *TableStoreSuite) SetUpSuite(c *C) {
37
38	endpoint := os.Getenv("OTS_TEST_ENDPOINT")
39	instanceName := os.Getenv("OTS_TEST_INSTANCENAME")
40	accessKeyId := os.Getenv("OTS_TEST_KEYID")
41	accessKeySecret := os.Getenv("OTS_TEST_SECRET")
42	client = NewClient(endpoint, instanceName, accessKeyId, accessKeySecret)
43
44	tableNamePrefix = strings.Replace(runtime.Version(), ".", "", -1)
45	defaultTableName = tableNamePrefix + defaultTableName
46	rangeQueryTableName = tableNamePrefix + rangeQueryTableName
47	PrepareTable(defaultTableName)
48	PrepareTable2(rangeQueryTableName)
49	invalidClient = NewClient(endpoint, instanceName, accessKeyId, "invalidsecret")
50}
51
52func PrepareTable(tableName string) error {
53	createtableRequest := new(CreateTableRequest)
54	tableMeta := new(TableMeta)
55	tableMeta.TableName = tableName
56	tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING)
57	tableOption := new(TableOption)
58	tableOption.TimeToAlive = -1
59	tableOption.MaxVersion = 3
60	reservedThroughput := new(ReservedThroughput)
61	reservedThroughput.Readcap = 0
62	reservedThroughput.Writecap = 0
63	createtableRequest.TableMeta = tableMeta
64	createtableRequest.TableOption = tableOption
65	createtableRequest.ReservedThroughput = reservedThroughput
66	_, error := client.CreateTable(createtableRequest)
67	return error
68}
69
70func PrepareTable2(tableName string) error {
71	createtableRequest := new(CreateTableRequest)
72	tableMeta := new(TableMeta)
73	tableMeta.TableName = tableName
74	tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING)
75	tableMeta.AddPrimaryKeyColumn("pk2", PrimaryKeyType_STRING)
76	tableOption := new(TableOption)
77	tableOption.TimeToAlive = -1
78	tableOption.MaxVersion = 3
79	reservedThroughput := new(ReservedThroughput)
80	reservedThroughput.Readcap = 0
81	reservedThroughput.Writecap = 0
82	createtableRequest.TableMeta = tableMeta
83	createtableRequest.TableOption = tableOption
84	createtableRequest.ReservedThroughput = reservedThroughput
85	_, error := client.CreateTable(createtableRequest)
86	return error
87}
88
89func (s *TableStoreSuite) TestCreateTable(c *C) {
90	fmt.Println("TestCreateTable finished")
91
92	tableName := tableNamePrefix + "testcreatetable1"
93
94	deleteReq := new(DeleteTableRequest)
95	deleteReq.TableName = tableName
96	client.DeleteTable(deleteReq)
97
98	createtableRequest := new(CreateTableRequest)
99
100	tableMeta := new(TableMeta)
101	tableMeta.TableName = tableName
102	tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING)
103
104	tableOption := new(TableOption)
105
106	tableOption.TimeToAlive = -1
107	tableOption.MaxVersion = 3
108
109	reservedThroughput := new(ReservedThroughput)
110	reservedThroughput.Readcap = 0
111	reservedThroughput.Writecap = 0
112
113	createtableRequest.TableMeta = tableMeta
114	createtableRequest.TableOption = tableOption
115	createtableRequest.ReservedThroughput = reservedThroughput
116
117	_, error := client.CreateTable(createtableRequest)
118	c.Check(error, Equals, nil)
119
120	fmt.Println("TestCreateTable finished")
121}
122
123func (s *TableStoreSuite) TestReCreateTableAndPutRow(c *C) {
124	fmt.Println("TestReCreateTableAndPutRow started")
125
126	tableName := tableNamePrefix + "testrecreatetable1"
127
128	deleteReq := new(DeleteTableRequest)
129	deleteReq.TableName = tableName
130	client.DeleteTable(deleteReq)
131
132	createtableRequest := new(CreateTableRequest)
133
134	tableMeta := new(TableMeta)
135	tableMeta.TableName = tableName
136	tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING)
137
138	tableOption := new(TableOption)
139
140	tableOption.TimeToAlive = -1
141	tableOption.MaxVersion = 3
142
143	reservedThroughput := new(ReservedThroughput)
144	reservedThroughput.Readcap = 0
145	reservedThroughput.Writecap = 0
146
147	createtableRequest.TableMeta = tableMeta
148	createtableRequest.TableOption = tableOption
149	createtableRequest.ReservedThroughput = reservedThroughput
150
151	_, error := client.CreateTable(createtableRequest)
152	c.Check(error, Equals, nil)
153
154	//time.Sleep(500 * time.Millisecond)
155	_, error = client.DeleteTable(deleteReq)
156	c.Check(error, Equals, nil)
157
158	_, error = client.CreateTable(createtableRequest)
159	c.Check(error, Equals, nil)
160
161	putRowRequest := new(PutRowRequest)
162	putRowChange := new(PutRowChange)
163	putRowChange.TableName = tableName
164	putPk := new(PrimaryKey)
165	putPk.AddPrimaryKeyColumn("pk1", "key1")
166	putRowChange.PrimaryKey = putPk
167	putRowChange.AddColumn("col1", "col1data1")
168	putRowChange.AddColumn("col2", int64(100))
169	putRowChange.AddColumn("col3", float64(2.1))
170	putRowChange.AddColumn("col4", true)
171	putRowChange.AddColumn("col5", int64(50))
172	putRowChange.AddColumn("col6", int64(60))
173	putRowChange.SetCondition(RowExistenceExpectation_IGNORE)
174	putRowRequest.PutRowChange = putRowChange
175	_, error = client.PutRow(putRowRequest)
176	c.Check(error, Equals, nil)
177
178	fmt.Println("TestReCreateTableAndPutRow finished")
179}
180
181func (s *TableStoreSuite) TestListTable(c *C) {
182	listtables, error := client.ListTable()
183	c.Check(error, Equals, nil)
184	defaultTableExist := false
185	for _, table := range listtables.TableNames {
186		fmt.Println(table)
187		if table == defaultTableName {
188			defaultTableExist = true
189			break
190		}
191	}
192
193	c.Check(defaultTableExist, Equals, true)
194}
195
196func (s *TableStoreSuite) TestUpdateAndDescribeTable(c *C) {
197	fmt.Println("TestUpdateAndDescribeTable started")
198	updateTableReq := new(UpdateTableRequest)
199	updateTableReq.TableName = defaultTableName
200	updateTableReq.TableOption = new(TableOption)
201	updateTableReq.TableOption.TimeToAlive = -1
202	updateTableReq.TableOption.MaxVersion = 5
203
204	updateTableResp, error := client.UpdateTable(updateTableReq)
205	c.Assert(error, Equals, nil)
206	c.Assert(updateTableResp, NotNil)
207	c.Assert(updateTableResp.TableOption.TimeToAlive, Equals, updateTableReq.TableOption.TimeToAlive)
208	c.Assert(updateTableResp.TableOption.MaxVersion, Equals, updateTableReq.TableOption.MaxVersion)
209
210	describeTableReq := new(DescribeTableRequest)
211	describeTableReq.TableName = defaultTableName
212	describ, error := client.DescribeTable(describeTableReq)
213	c.Assert(error, Equals, nil)
214
215	c.Assert(describ, NotNil)
216	c.Assert(describ.TableOption.TimeToAlive, Equals, updateTableReq.TableOption.TimeToAlive)
217	c.Assert(describ.TableOption.MaxVersion, Equals, updateTableReq.TableOption.MaxVersion)
218	fmt.Println("TestUpdateAndDescribeTable finished")
219}
220
221func (s *TableStoreSuite) TestTableWithKeyAutoIncrement(c *C) {
222	tableName := tableNamePrefix + "incrementtable"
223	createtableRequest := new(CreateTableRequest)
224
225	tableMeta := new(TableMeta)
226	tableMeta.TableName = tableName
227	tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING)
228	tableMeta.AddPrimaryKeyColumnOption("pk2", PrimaryKeyType_INTEGER, AUTO_INCREMENT)
229
230	tableOption := new(TableOption)
231	tableOption.TimeToAlive = -1
232	tableOption.MaxVersion = 3
233
234	reservedThroughput := new(ReservedThroughput)
235	reservedThroughput.Readcap = 0
236	reservedThroughput.Writecap = 0
237
238	createtableRequest.TableMeta = tableMeta
239	createtableRequest.TableOption = tableOption
240	createtableRequest.ReservedThroughput = reservedThroughput
241
242	client.CreateTable(createtableRequest)
243	rowCount := 100
244	for i := 0; i < rowCount; i++ {
245		putRowRequest := new(PutRowRequest)
246		putRowChange := new(PutRowChange)
247		putRowChange.TableName = tableName
248		putPk := new(PrimaryKey)
249		putPk.AddPrimaryKeyColumn("pk1", "key"+strconv.Itoa(i))
250		putPk.AddPrimaryKeyColumnWithAutoIncrement("pk2")
251		putRowChange.PrimaryKey = putPk
252		putRowChange.AddColumn("col1", "col1data1")
253		putRowChange.AddColumn("col2", int64(100))
254		putRowChange.SetCondition(RowExistenceExpectation_IGNORE)
255		putRowRequest.PutRowChange = putRowChange
256		putRowRequest.PutRowChange.SetReturnPk()
257		response, error := client.PutRow(putRowRequest)
258		c.Check(error, Equals, nil)
259		c.Check(len(response.PrimaryKey.PrimaryKeys), Equals, 2)
260		c.Check(response.PrimaryKey.PrimaryKeys[0].ColumnName, Equals, "pk1")
261		c.Check(response.PrimaryKey.PrimaryKeys[0].Value, Equals, "key"+strconv.Itoa(i))
262		c.Check(response.PrimaryKey.PrimaryKeys[1].ColumnName, Equals, "pk2")
263		c.Check(response.PrimaryKey.PrimaryKeys[1].Value.(int64) > 0, Equals, true)
264
265		fmt.Println(response.PrimaryKey.PrimaryKeys[1].Value)
266	}
267
268	describeTableReq := new(DescribeTableRequest)
269	describeTableReq.TableName = tableName
270	_, error := client.DescribeTable(describeTableReq)
271	c.Check(error, IsNil)
272}
273
274func (s *TableStoreSuite) TestPutGetRow(c *C) {
275	fmt.Println("TestPutGetRow started")
276	putRowRequest := new(PutRowRequest)
277	putRowChange := new(PutRowChange)
278	putRowChange.TableName = defaultTableName
279	putPk := new(PrimaryKey)
280	putPk.AddPrimaryKeyColumn("pk1", "Key6")
281	putRowChange.PrimaryKey = putPk
282	putRowChange.AddColumn("col1", "col1data1")
283	putRowChange.AddColumn("col2", int64(100))
284	putRowChange.AddColumn("col3", float64(2.1))
285	putRowChange.AddColumn("col4", true)
286	putRowChange.AddColumn("col5", int64(50))
287	putRowChange.AddColumn("col6", int64(60))
288	putRowChange.AddColumn("col7", []byte("testbytes"))
289	putRowChange.AddColumn("col8", false)
290	putRowChange.SetCondition(RowExistenceExpectation_IGNORE)
291	putRowRequest.PutRowChange = putRowChange
292	_, error := client.PutRow(putRowRequest)
293	c.Check(error, Equals, nil)
294
295	getRowRequest := new(GetRowRequest)
296	criteria := new(SingleRowQueryCriteria)
297	criteria.PrimaryKey = putPk
298	getRowRequest.SingleRowQueryCriteria = criteria
299	getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName
300	getRowRequest.SingleRowQueryCriteria.MaxVersion = 1
301	getResp, error := client.GetRow(getRowRequest)
302	c.Check(error, Equals, nil)
303	c.Check(getResp, NotNil)
304	c.Check(len(getResp.PrimaryKey.PrimaryKeys), Equals, 1)
305	c.Check(getResp.PrimaryKey.PrimaryKeys[0].ColumnName, Equals, "pk1")
306	c.Check(getResp.PrimaryKey.PrimaryKeys[0].Value, Equals, "Key6")
307	c.Check(len(getResp.Columns), Equals, 8)
308	c.Check(getResp.Columns[0].ColumnName, Equals, "col1")
309	c.Check(getResp.Columns[0].Value, Equals, "col1data1")
310	c.Check(getResp.Columns[1].ColumnName, Equals, "col2")
311	c.Check(getResp.Columns[1].Value, Equals, int64(100))
312	c.Check(getResp.Columns[2].ColumnName, Equals, "col3")
313	c.Check(getResp.Columns[2].Value, Equals, float64(2.1))
314	c.Check(getResp.Columns[3].ColumnName, Equals, "col4")
315	c.Check(getResp.Columns[3].Value, Equals, true)
316	c.Check(getResp.Columns[4].ColumnName, Equals, "col5")
317	c.Check(getResp.Columns[4].Value, Equals, int64(50))
318	c.Check(getResp.Columns[5].ColumnName, Equals, "col6")
319	c.Check(getResp.Columns[5].Value, Equals, int64(60))
320	c.Check(getResp.Columns[6].ColumnName, Equals, "col7")
321	mapData := getResp.GetColumnMap()
322	c.Check(mapData.Columns["col1"][0].Value, Equals, "col1data1")
323	c.Check(mapData.Columns["col2"][0].Value, Equals, int64(100))
324	c.Check(mapData.Columns["col3"][0].Value, Equals, float64(2.1))
325	c.Check(mapData.Columns["col4"][0].Value, Equals, true)
326	c.Check(mapData.Columns["col5"][0].Value, Equals, int64(50))
327	c.Check(mapData.Columns["col6"][0].Value, Equals, int64(60))
328
329	sortedColumn, error := mapData.GetRange(2, 2)
330	c.Check(error, Equals, nil)
331	c.Check(len(sortedColumn), Equals, 2)
332	c.Check(sortedColumn[0], Equals, mapData.Columns["col3"][0])
333	c.Check(sortedColumn[1], Equals, mapData.Columns["col4"][0])
334
335	mapData2 := getResp.GetColumnMap()
336	c.Check(mapData2.Columns["col1"][0].Value, Equals, "col1data1")
337
338	_, error = mapData.GetRange(2, 10)
339	c.Check(error, NotNil)
340	// Test add column to get
341	getRowRequest = new(GetRowRequest)
342	criteria = new(SingleRowQueryCriteria)
343	criteria.PrimaryKey = putPk
344	getRowRequest.SingleRowQueryCriteria = criteria
345	getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName
346	getRowRequest.SingleRowQueryCriteria.MaxVersion = 1
347	getRowRequest.SingleRowQueryCriteria.AddColumnToGet("col1")
348	getRowRequest.SingleRowQueryCriteria.AddColumnToGet("col2")
349
350	getResp, error = client.GetRow(getRowRequest)
351
352	c.Check(error, Equals, nil)
353	c.Check(getResp, NotNil)
354	c.Check(len(getResp.Columns), Equals, 2)
355
356	_, error = invalidClient.GetRow(getRowRequest)
357	c.Check(error, NotNil)
358
359	getRowRequest = new(GetRowRequest)
360	criteria = new(SingleRowQueryCriteria)
361	criteria.PrimaryKey = putPk
362	getRowRequest.SingleRowQueryCriteria = criteria
363	getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName
364	_, error = client.GetRow(getRowRequest)
365	c.Check(error, NotNil)
366
367	notExistPk := new(PrimaryKey)
368	notExistPk.AddPrimaryKeyColumn("pk1", "notexistpk")
369	getRowRequest = new(GetRowRequest)
370	criteria = new(SingleRowQueryCriteria)
371
372	criteria.PrimaryKey = notExistPk
373	getRowRequest.SingleRowQueryCriteria = criteria
374	getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName
375	getRowRequest.SingleRowQueryCriteria.MaxVersion = 1
376
377	getResp, error = client.GetRow(getRowRequest)
378	c.Check(error, IsNil)
379	c.Check(getResp, NotNil)
380
381	colmap := getResp.GetColumnMap()
382	c.Check(colmap, NotNil)
383
384	fmt.Println("TestPutGetRow finished")
385}
386
387func (s *TableStoreSuite) TestCreateTableAndPutRow(c *C) {
388	fmt.Println("TestCreateTableAndPutRow finished")
389
390	tableName := tableNamePrefix + "testpkschema"
391	deleteReq := new(DeleteTableRequest)
392	deleteReq.TableName = tableName
393	client.DeleteTable(deleteReq)
394
395	createtableRequest := new(CreateTableRequest)
396
397	tableMeta := new(TableMeta)
398	tableMeta.TableName = tableName
399	tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING)
400	tableMeta.AddPrimaryKeyColumn("pk2", PrimaryKeyType_INTEGER)
401	tableMeta.AddPrimaryKeyColumn("pk3", PrimaryKeyType_BINARY)
402
403	tableOption := new(TableOption)
404
405	tableOption.TimeToAlive = -1
406	tableOption.MaxVersion = 3
407
408	reservedThroughput := new(ReservedThroughput)
409	reservedThroughput.Readcap = 0
410	reservedThroughput.Writecap = 0
411
412	createtableRequest.TableMeta = tableMeta
413	createtableRequest.TableOption = tableOption
414	createtableRequest.ReservedThroughput = reservedThroughput
415
416	_, error := client.CreateTable(createtableRequest)
417	c.Check(error, Equals, nil)
418
419	putRowRequest := new(PutRowRequest)
420	putRowChange := new(PutRowChange)
421	putRowChange.TableName = tableName
422	putPk := new(PrimaryKey)
423	putPk.AddPrimaryKeyColumn("pk1", "key2")
424	putPk.AddPrimaryKeyColumn("pk2", int64(5))
425	putPk.AddPrimaryKeyColumn("pk3", []byte("byteskey1"))
426	putRowChange.PrimaryKey = putPk
427
428	timeNow := time.Now().Unix() * 1000
429	putRowChange.AddColumnWithTimestamp("col1", "col1data1", timeNow)
430	putRowChange.AddColumn("col2", int64(100))
431	putRowChange.AddColumn("col3", float64(2.1))
432	putRowChange.SetCondition(RowExistenceExpectation_IGNORE)
433	putRowRequest.PutRowChange = putRowChange
434	_, error = client.PutRow(putRowRequest)
435	c.Check(error, Equals, nil)
436
437	fmt.Println("TestCreateTableAndPutRow finished")
438}
439
440func (s *TableStoreSuite) TestPutGetRowWithTimestamp(c *C) {
441	fmt.Println("TestPutGetRowWithTimestamp started")
442	putRowRequest := new(PutRowRequest)
443	putRowChange := new(PutRowChange)
444	putRowChange.TableName = defaultTableName
445	putPk := new(PrimaryKey)
446	putPk.AddPrimaryKeyColumn("pk1", "testtskey1")
447	putRowChange.PrimaryKey = putPk
448	timeNow := time.Now().Unix() * 1000
449	putRowChange.AddColumnWithTimestamp("col1", "col1data1", timeNow)
450	putRowChange.AddColumn("col2", int64(100))
451	putRowChange.AddColumn("col3", float64(2.1))
452	putRowChange.AddColumn("col4", true)
453	putRowChange.AddColumn("col5", int64(50))
454	putRowChange.AddColumn("col6", int64(60))
455	putRowChange.SetCondition(RowExistenceExpectation_IGNORE)
456	putRowRequest.PutRowChange = putRowChange
457	_, error := client.PutRow(putRowRequest)
458	c.Check(error, Equals, nil)
459
460	getRowRequest := new(GetRowRequest)
461	criteria := new(SingleRowQueryCriteria)
462	criteria.PrimaryKey = putPk
463	getRowRequest.SingleRowQueryCriteria = criteria
464	getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName
465	getRowRequest.SingleRowQueryCriteria.MaxVersion = 1
466	// getRowRequest.SingleRowQueryCriteria.TimeRange = &TimeRange{Specific: timeNow}
467	getResp, error := client.GetRow(getRowRequest)
468	c.Check(error, Equals, nil)
469	c.Check(getResp, NotNil)
470	c.Check(len(getResp.PrimaryKey.PrimaryKeys), Equals, 1)
471	c.Check(getResp.PrimaryKey.PrimaryKeys[0].ColumnName, Equals, "pk1")
472	c.Check(getResp.PrimaryKey.PrimaryKeys[0].Value, Equals, "testtskey1")
473	c.Check(len(getResp.Columns), Equals, 6)
474	c.Check(getResp.Columns[0].ColumnName, Equals, "col1")
475	c.Check(getResp.Columns[0].Value, Equals, "col1data1")
476	c.Check(getResp.Columns[0].Timestamp, Equals, timeNow)
477	c.Check(getResp.Columns[1].ColumnName, Equals, "col2")
478	c.Check(getResp.Columns[1].Value, Equals, int64(100))
479	c.Check(getResp.Columns[2].ColumnName, Equals, "col3")
480	c.Check(getResp.Columns[2].Value, Equals, float64(2.1))
481	c.Check(getResp.Columns[3].ColumnName, Equals, "col4")
482	c.Check(getResp.Columns[3].Value, Equals, true)
483	c.Check(getResp.Columns[4].ColumnName, Equals, "col5")
484	c.Check(getResp.Columns[4].Value, Equals, int64(50))
485	c.Check(getResp.Columns[5].ColumnName, Equals, "col6")
486	c.Check(getResp.Columns[5].Value, Equals, int64(60))
487
488	getRowRequest.SingleRowQueryCriteria.MaxVersion = 0
489	fmt.Println("timerange", timeNow)
490	getRowRequest.SingleRowQueryCriteria.AddColumnToGet("col1")
491	getRowRequest.SingleRowQueryCriteria.TimeRange = &TimeRange{Specific: timeNow - 1}
492	getResp2, error := client.GetRow(getRowRequest)
493	c.Check(error, Equals, nil)
494	c.Check(getResp2, NotNil)
495	c.Check(len(getResp2.PrimaryKey.PrimaryKeys), Equals, 0)
496
497	getRowRequest.SingleRowQueryCriteria.MaxVersion = 0
498	fmt.Println("timerange", timeNow)
499	getRowRequest.SingleRowQueryCriteria.AddColumnToGet("col1")
500	getRowRequest.SingleRowQueryCriteria.TimeRange = &TimeRange{Start: timeNow + 1, End: timeNow + 2}
501	getResp2, error = client.GetRow(getRowRequest)
502	c.Check(error, Equals, nil)
503	c.Check(getResp2, NotNil)
504
505	getRowRequest.SingleRowQueryCriteria.MaxVersion = 0
506	fmt.Println("timerange", timeNow)
507	getRowRequest.SingleRowQueryCriteria.AddColumnToGet("col1")
508	getRowRequest.SingleRowQueryCriteria.TimeRange = &TimeRange{Specific: timeNow - 1}
509	getResp2, error = client.GetRow(getRowRequest)
510	c.Check(error, Equals, nil)
511	c.Check(getResp2, NotNil)
512	c.Check(len(getResp2.PrimaryKey.PrimaryKeys), Equals, 0)
513
514	fmt.Println("timerange", timeNow)
515	getRowRequest.SingleRowQueryCriteria.AddColumnToGet("col1")
516	getRowRequest.SingleRowQueryCriteria.TimeRange = &TimeRange{Start: timeNow - 1, End: timeNow + 2}
517	getResp2, error = client.GetRow(getRowRequest)
518	c.Check(error, Equals, nil)
519	c.Check(getResp2, NotNil)
520	c.Check(len(getResp2.PrimaryKey.PrimaryKeys), Equals, 1)
521
522	fmt.Println("TestPutGetRowWithTimestamp finished")
523}
524
525func (s *TableStoreSuite) TestPutGetRowWithFilter(c *C) {
526	fmt.Println("TestPutGetRowWithFilter started")
527	putRowRequest := new(PutRowRequest)
528	putRowChange := new(PutRowChange)
529	putRowChange.TableName = defaultTableName
530	putPk := new(PrimaryKey)
531	putPk.AddPrimaryKeyColumn("pk1", "Key6")
532	putRowChange.PrimaryKey = putPk
533	putRowChange.AddColumn("col1", "col1data1")
534	putRowChange.AddColumn("col2", int64(100))
535	putRowChange.AddColumn("col3", float64(5.1))
536	putRowChange.AddColumn("col4", true)
537	putRowChange.AddColumn("col5", int64(50))
538	putRowChange.AddColumn("col6", int64(60))
539	putRowChange.AddColumn("col7", []byte("testbytes"))
540	putRowChange.AddColumn("col8", false)
541	putRowChange.SetCondition(RowExistenceExpectation_IGNORE)
542	clCondition1 := NewSingleColumnCondition("col2", CT_GREATER_EQUAL, int64(100))
543	clCondition2 := NewSingleColumnCondition("col5", CT_NOT_EQUAL, int64(20))
544	clCondition3 := NewSingleColumnCondition("col6", CT_LESS_THAN, int64(100))
545	clCondition4 := NewSingleColumnCondition("col4", CT_EQUAL, true)
546	clCondition5 := NewSingleColumnCondition("col1", CT_EQUAL, "col1data1")
547	clCondition6 := NewSingleColumnCondition("col3", CT_LESS_EQUAL, float64(5.1))
548	clCondition7 := NewSingleColumnCondition("col7", CT_EQUAL, []byte("testbytes"))
549	clCondition8 := NewSingleColumnCondition("col5", CT_GREATER_THAN, int64(20))
550
551	cf := NewCompositeColumnCondition(LO_AND)
552	cf.AddFilter(clCondition1)
553	cf.AddFilter(clCondition2)
554	cf.AddFilter(clCondition3)
555	cf.AddFilter(clCondition4)
556	cf.AddFilter(clCondition5)
557	cf.AddFilter(clCondition6)
558	cf.AddFilter(clCondition7)
559	cf.AddFilter(clCondition8)
560	putRowChange.SetColumnCondition(cf)
561
562	putRowRequest.PutRowChange = putRowChange
563	_, error := client.PutRow(putRowRequest)
564	c.Check(error, Equals, nil)
565
566	cf2 := NewCompositeColumnCondition(LO_OR)
567	cf2.AddFilter(clCondition7)
568	cf2.AddFilter(clCondition8)
569	cf3 := NewCompositeColumnCondition(LO_NOT)
570	clCondition9 := NewSingleColumnCondition("col5", CT_GREATER_THAN, int64(200))
571	cf3.AddFilter(clCondition9)
572	cf2.AddFilter(cf3)
573
574	getRowRequest := new(GetRowRequest)
575	criteria := new(SingleRowQueryCriteria)
576	criteria.PrimaryKey = putPk
577	getRowRequest.SingleRowQueryCriteria = criteria
578	getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName
579	getRowRequest.SingleRowQueryCriteria.MaxVersion = 1
580	getRowRequest.SingleRowQueryCriteria.SetFilter(cf2)
581	getResp, error := client.GetRow(getRowRequest)
582	c.Check(error, Equals, nil)
583	c.Check(getResp, NotNil)
584	c.Check(len(getResp.PrimaryKey.PrimaryKeys), Equals, 1)
585	c.Check(getResp.PrimaryKey.PrimaryKeys[0].ColumnName, Equals, "pk1")
586	c.Check(getResp.PrimaryKey.PrimaryKeys[0].Value, Equals, "Key6")
587	c.Check(len(getResp.Columns), Equals, 8)
588	c.Check(getResp.Columns[0].ColumnName, Equals, "col1")
589	c.Check(getResp.Columns[0].Value, Equals, "col1data1")
590	c.Check(getResp.Columns[1].ColumnName, Equals, "col2")
591	c.Check(getResp.Columns[1].Value, Equals, int64(100))
592	c.Check(getResp.Columns[2].ColumnName, Equals, "col3")
593	c.Check(getResp.Columns[2].Value, Equals, float64(5.1))
594	c.Check(getResp.Columns[3].ColumnName, Equals, "col4")
595	c.Check(getResp.Columns[3].Value, Equals, true)
596	c.Check(getResp.Columns[4].ColumnName, Equals, "col5")
597	c.Check(getResp.Columns[4].Value, Equals, int64(50))
598	c.Check(getResp.Columns[5].ColumnName, Equals, "col6")
599	c.Check(getResp.Columns[5].Value, Equals, int64(60))
600
601	getRowRequest = new(GetRowRequest)
602	criteria = new(SingleRowQueryCriteria)
603	criteria.PrimaryKey = putPk
604	getRowRequest.SingleRowQueryCriteria = criteria
605	getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName
606	getRowRequest.SingleRowQueryCriteria.MaxVersion = 1
607
608	pagedFilter := &PaginationFilter{}
609	pagedFilter.Limit = 3
610	pagedFilter.Offset = 1
611	getRowRequest.SingleRowQueryCriteria.SetFilter(pagedFilter)
612	getResp, error = client.GetRow(getRowRequest)
613	c.Check(error, Equals, nil)
614	c.Check(getResp, NotNil)
615	c.Check(len(getResp.Columns), Equals, 3)
616
617	getRowRequest = new(GetRowRequest)
618	criteria = new(SingleRowQueryCriteria)
619	criteria.PrimaryKey = putPk
620	getRowRequest.SingleRowQueryCriteria = criteria
621	getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName
622	getRowRequest.SingleRowQueryCriteria.MaxVersion = 1
623
624	getRowRequest.SingleRowQueryCriteria.SetStartColumn("col3")
625	pagedFilter = &PaginationFilter{}
626	pagedFilter.Limit = 3
627	pagedFilter.Offset = 1
628	getRowRequest.SingleRowQueryCriteria.SetFilter(pagedFilter)
629	getResp, error = client.GetRow(getRowRequest)
630	c.Check(error, Equals, nil)
631	c.Check(getResp, NotNil)
632	c.Check(getResp.Columns[0].ColumnName, Equals, "col4")
633	fmt.Println("TestPutGetRowWithFilter finished")
634}
635
636func (s *TableStoreSuite) TestPutUpdateDeleteRow(c *C) {
637	fmt.Println("TestPutUpdateDeleteRow started")
638	keyToUpdate := "pk1toupdate"
639	putRowRequest := new(PutRowRequest)
640	putRowChange := new(PutRowChange)
641	putRowChange.TableName = defaultTableName
642	putPk := new(PrimaryKey)
643	putPk.AddPrimaryKeyColumn("pk1", keyToUpdate)
644	putRowChange.PrimaryKey = putPk
645	putRowChange.AddColumn("col1", "col1data1")
646	timeNow := int64(time.Now().Unix() * 1000)
647	putRowChange.AddColumnWithTimestamp("col10", "col10data10", timeNow)
648	putRowChange.SetCondition(RowExistenceExpectation_IGNORE)
649	putRowRequest.PutRowChange = putRowChange
650	_, error := client.PutRow(putRowRequest)
651	c.Check(error, Equals, nil)
652
653	updateRowRequest := new(UpdateRowRequest)
654	updateRowChange := new(UpdateRowChange)
655	updateRowChange.TableName = defaultTableName
656	updatePk := new(PrimaryKey)
657	updatePk.AddPrimaryKeyColumn("pk1", keyToUpdate)
658	updateRowChange.PrimaryKey = updatePk
659	updateRowChange.DeleteColumn("col1")
660	updateRowChange.DeleteColumnWithTimestamp("col10", timeNow)
661	updateRowChange.PutColumn("col2", int64(77))
662	updateRowChange.PutColumn("col3", "newcol3")
663	updateRowChange.SetCondition(RowExistenceExpectation_EXPECT_EXIST)
664	updateRowRequest.UpdateRowChange = updateRowChange
665	_, error = client.UpdateRow(updateRowRequest)
666	c.Check(error, Equals, nil)
667
668	getRowRequest := new(GetRowRequest)
669	criteria := new(SingleRowQueryCriteria)
670	criteria.PrimaryKey = putPk
671	getRowRequest.SingleRowQueryCriteria = criteria
672	getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName
673	getRowRequest.SingleRowQueryCriteria.MaxVersion = 1
674	getResp, error := client.GetRow(getRowRequest)
675	c.Check(error, Equals, nil)
676	c.Check(getResp, NotNil)
677
678	c.Check(len(getResp.PrimaryKey.PrimaryKeys), Equals, 1)
679	c.Check(getResp.PrimaryKey.PrimaryKeys[0].ColumnName, Equals, "pk1")
680	c.Check(getResp.PrimaryKey.PrimaryKeys[0].Value, Equals, keyToUpdate)
681	c.Check(len(getResp.Columns), Equals, 2)
682	c.Check(getResp.Columns[0].ColumnName, Equals, "col2")
683	c.Check(getResp.Columns[0].Value, Equals, int64(77))
684	c.Check(getResp.Columns[1].ColumnName, Equals, "col3")
685	c.Check(getResp.Columns[1].Value, Equals, "newcol3")
686
687	deleteRowReq := new(DeleteRowRequest)
688	deleteRowReq.DeleteRowChange = new(DeleteRowChange)
689	deleteRowReq.DeleteRowChange.TableName = defaultTableName
690	deletePk := new(PrimaryKey)
691	deletePk.AddPrimaryKeyColumn("pk1", keyToUpdate)
692	deleteRowReq.DeleteRowChange.PrimaryKey = deletePk
693	deleteRowReq.DeleteRowChange.SetCondition(RowExistenceExpectation_EXPECT_EXIST)
694	clCondition1 := NewSingleColumnCondition("col2", CT_EQUAL, int64(77))
695	deleteRowReq.DeleteRowChange.SetColumnCondition(clCondition1)
696	resp, error := client.DeleteRow(deleteRowReq)
697	c.Check(error, Equals, nil)
698	fmt.Println(resp.ConsumedCapacityUnit.Write)
699	fmt.Println(resp.ConsumedCapacityUnit.Read)
700
701	_, error = invalidClient.UpdateRow(updateRowRequest)
702	c.Check(error, NotNil)
703
704	_, error = invalidClient.DeleteRow(deleteRowReq)
705	c.Check(error, NotNil)
706
707	fmt.Println("TestPutUpdateDeleteRow finished")
708}
709
710func (s *TableStoreSuite) TestBatchGetRow(c *C) {
711	fmt.Println("TestBatchGetRow started")
712	rowCount := 100
713	for i := 0; i < rowCount; i++ {
714		key := "batchkey" + strconv.Itoa(i)
715		value := "value" + strconv.Itoa(i)
716		PrepareDataInDefaultTable(key, value)
717	}
718
719	batchGetReq := &BatchGetRowRequest{}
720	mqCriteria := &MultiRowQueryCriteria{}
721
722	for i := 0; i < rowCount; i++ {
723		pkToGet := new(PrimaryKey)
724		key := "batchkey" + strconv.Itoa(i)
725		pkToGet.AddPrimaryKeyColumn("pk1", key)
726		mqCriteria.AddRow(pkToGet)
727	}
728	mqCriteria.MaxVersion = 1
729	mqCriteria.TableName = defaultTableName
730	batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria)
731	batchGetResponse, error := client.BatchGetRow(batchGetReq)
732	c.Check(error, Equals, nil)
733
734	c.Check(len(batchGetResponse.TableToRowsResult), Equals, 1)
735	c.Check(len(batchGetResponse.TableToRowsResult[mqCriteria.TableName]), Equals, rowCount)
736
737	for index, rowToCheck := range batchGetResponse.TableToRowsResult[mqCriteria.TableName] {
738		c.Check(rowToCheck.Index, Equals, int32(index))
739		c.Check(rowToCheck.TableName, Equals, mqCriteria.TableName)
740		c.Check(rowToCheck.IsSucceed, Equals, true)
741		c.Check(len(rowToCheck.PrimaryKey.PrimaryKeys), Equals, 1)
742		c.Check(len(rowToCheck.Columns), Equals, 1)
743	}
744
745	batchGetReq = &BatchGetRowRequest{}
746	mqCriteria = &MultiRowQueryCriteria{}
747
748	for i := 0; i < rowCount; i++ {
749		pkToGet := new(PrimaryKey)
750		key := "batchkey" + strconv.Itoa(i)
751		pkToGet.AddPrimaryKeyColumn("pk1", key)
752		mqCriteria.AddRow(pkToGet)
753		mqCriteria.AddColumnToGet("col1")
754	}
755	timeNow := time.Now().Unix() * 1000
756	mqCriteria.TimeRange = &TimeRange{Start: timeNow - 10000, End: timeNow + 10000}
757	mqCriteria.TableName = defaultTableName
758	batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria)
759	batchGetResponse, error = client.BatchGetRow(batchGetReq)
760	c.Check(error, Equals, nil)
761	c.Check(len(batchGetResponse.TableToRowsResult), Equals, 1)
762	c.Check(len(batchGetResponse.TableToRowsResult[mqCriteria.TableName]), Equals, rowCount)
763
764	for index, rowToCheck := range batchGetResponse.TableToRowsResult[mqCriteria.TableName] {
765		c.Check(rowToCheck.TableName, Equals, mqCriteria.TableName)
766		c.Check(rowToCheck.IsSucceed, Equals, true)
767		c.Check(len(rowToCheck.PrimaryKey.PrimaryKeys), Equals, 1)
768		c.Check(len(rowToCheck.Columns), Equals, 1)
769		c.Check(rowToCheck.Index, Equals, int32(index))
770	}
771
772	// test timerange
773	batchGetReq = &BatchGetRowRequest{}
774	mqCriteria = &MultiRowQueryCriteria{}
775
776	for i := 0; i < rowCount; i++ {
777		pkToGet := new(PrimaryKey)
778		key := "batchkey" + strconv.Itoa(i)
779		pkToGet.AddPrimaryKeyColumn("pk1", key)
780		mqCriteria.AddRow(pkToGet)
781	}
782	mqCriteria.TimeRange = &TimeRange{Start: timeNow + 10000, End: timeNow + 20000}
783	mqCriteria.TableName = defaultTableName
784	batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria)
785	batchGetResponse, error = client.BatchGetRow(batchGetReq)
786	c.Check(error, Equals, nil)
787
788	c.Check(len(batchGetResponse.TableToRowsResult), Equals, 1)
789	c.Check(len(batchGetResponse.TableToRowsResult[mqCriteria.TableName]), Equals, rowCount)
790
791	for index, rowToCheck := range batchGetResponse.TableToRowsResult[mqCriteria.TableName] {
792		c.Check(rowToCheck.TableName, Equals, mqCriteria.TableName)
793		c.Check(rowToCheck.IsSucceed, Equals, true)
794		c.Check(len(rowToCheck.PrimaryKey.PrimaryKeys), Equals, 1)
795		c.Check(len(rowToCheck.Columns), Equals, 0)
796		c.Check(rowToCheck.Index, Equals, int32(index))
797	}
798	_, error = invalidClient.BatchGetRow(batchGetReq)
799	c.Check(error, NotNil)
800
801	fmt.Println("TestBatchGetRow started")
802}
803
804func (s *TableStoreSuite) TestBatchGetRowWithFilter(c *C) {
805	fmt.Println("TestBatchGetRowWithFilter started")
806	rowCount := 100
807	for i := 0; i < rowCount; i++ {
808		key := "filterbatchkey" + strconv.Itoa(i)
809		value1 := "col0value" + strconv.Itoa(i)
810		value2 := "col1value" + strconv.Itoa(i)
811		value3 := "col2value" + strconv.Itoa(i)
812		PrepareDataInDefaultTableWithMultiAttribute(key, value1, value2, value3)
813	}
814
815	// pagination filter
816	pagedFilter := &PaginationFilter{}
817	pagedFilter.Limit = 2
818	pagedFilter.Offset = 1
819
820	batchGetReq := &BatchGetRowRequest{}
821	mqCriteria := &MultiRowQueryCriteria{}
822	mqCriteria.SetFilter(pagedFilter)
823
824	for i := 0; i < rowCount; i++ {
825		pkToGet := new(PrimaryKey)
826		key := "filterbatchkey" + strconv.Itoa(i)
827		pkToGet.AddPrimaryKeyColumn("pk1", key)
828		mqCriteria.AddRow(pkToGet)
829	}
830
831	mqCriteria.MaxVersion = 1
832	mqCriteria.TableName = defaultTableName
833	batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria)
834	batchGetResponse, error := client.BatchGetRow(batchGetReq)
835	c.Check(error, Equals, nil)
836
837	c.Check(len(batchGetResponse.TableToRowsResult), Equals, 1)
838	c.Check(len(batchGetResponse.TableToRowsResult[mqCriteria.TableName]), Equals, rowCount)
839
840	for index, rowToCheck := range batchGetResponse.TableToRowsResult[mqCriteria.TableName] {
841		c.Check(rowToCheck.TableName, Equals, mqCriteria.TableName)
842		c.Check(rowToCheck.IsSucceed, Equals, true)
843		c.Check(len(rowToCheck.PrimaryKey.PrimaryKeys), Equals, 1)
844		c.Check(len(rowToCheck.Columns), Equals, 2)
845		c.Check(rowToCheck.Index, Equals, int32(index))
846	}
847
848	// compsite filter
849	batchGetReq = &BatchGetRowRequest{}
850	clCondition1 := NewSingleColumnCondition("col1", CT_EQUAL, "col0value1")
851	clCondition2 := NewSingleColumnCondition("col2", CT_EQUAL, "col1value1")
852
853	cf := NewCompositeColumnCondition(LO_AND)
854	cf.AddFilter(clCondition1)
855	cf.AddFilter(clCondition2)
856
857	mqCriteria = &MultiRowQueryCriteria{}
858	mqCriteria.SetFilter(cf)
859
860	for i := 0; i < rowCount; i++ {
861		pkToGet := new(PrimaryKey)
862		key := "filterbatchkey" + strconv.Itoa(i)
863		pkToGet.AddPrimaryKeyColumn("pk1", key)
864		mqCriteria.AddRow(pkToGet)
865	}
866
867	mqCriteria.MaxVersion = 1
868	mqCriteria.TableName = defaultTableName
869	batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria)
870	batchGetResponse, error = client.BatchGetRow(batchGetReq)
871	c.Check(error, Equals, nil)
872
873	c.Check(len(batchGetResponse.TableToRowsResult), Equals, 1)
874	c.Check(len(batchGetResponse.TableToRowsResult[mqCriteria.TableName]), Equals, rowCount)
875
876	count := 0
877	for index, rowToCheck := range batchGetResponse.TableToRowsResult[mqCriteria.TableName] {
878		c.Check(rowToCheck.Index, Equals, int32(index))
879		c.Check(rowToCheck.TableName, Equals, mqCriteria.TableName)
880		c.Check(rowToCheck.IsSucceed, Equals, true)
881
882		if len(rowToCheck.PrimaryKey.PrimaryKeys) > 0 {
883			c.Check(len(rowToCheck.Columns), Equals, 3)
884			count++
885		}
886	}
887	c.Check(count, Equals, 1)
888
889	fmt.Println("TestBatchGetRowWithFilter finished")
890}
891
892func (s *TableStoreSuite) TestBatchWriteRow(c *C) {
893	fmt.Println("TestBatchWriteRow started")
894
895	PrepareDataInDefaultTable("updateinbatchkey1", "updateinput1")
896	PrepareDataInDefaultTable("deleteinbatchkey1", "deleteinput1")
897	batchWriteReq := &BatchWriteRowRequest{}
898
899	rowToPut1 := CreatePutRowChange("putinbatchkey1", "datainput1")
900	rowToPut2 := CreatePutRowChange("putinbatchkey2", "datainput2")
901
902	updateRowChange := new(UpdateRowChange)
903	updateRowChange.TableName = defaultTableName
904	updatePk := new(PrimaryKey)
905	updatePk.AddPrimaryKeyColumn("pk1", "updateinbatchkey1")
906	updateRowChange.PrimaryKey = updatePk
907	updateRowChange.DeleteColumn("col1")
908	updateRowChange.PutColumn("col2", int64(77))
909	updateRowChange.PutColumn("col3", "newcol3")
910	updateRowChange.SetCondition(RowExistenceExpectation_EXPECT_EXIST)
911
912	deleteRowChange := new(DeleteRowChange)
913	deleteRowChange.TableName = defaultTableName
914	deletePk := new(PrimaryKey)
915	deletePk.AddPrimaryKeyColumn("pk1", "deleteinbatchkey1")
916	deleteRowChange.PrimaryKey = deletePk
917	deleteRowChange.SetCondition(RowExistenceExpectation_EXPECT_EXIST)
918
919	batchWriteReq.AddRowChange(rowToPut1)
920	batchWriteReq.AddRowChange(rowToPut2)
921	batchWriteReq.AddRowChange(updateRowChange)
922	batchWriteReq.AddRowChange(deleteRowChange)
923
924	batchWriteResponse, error := client.BatchWriteRow(batchWriteReq)
925	c.Check(error, Equals, nil)
926	c.Check(len(batchWriteResponse.TableToRowsResult), Equals, 1)
927
928	for index, rowToCheck := range batchWriteResponse.TableToRowsResult[defaultTableName] {
929		c.Check(rowToCheck.Index, Equals, int32(index))
930		c.Check(rowToCheck.TableName, Equals, defaultTableName)
931		c.Check(rowToCheck.IsSucceed, Equals, true)
932	}
933
934	_, error = invalidClient.BatchWriteRow(batchWriteReq)
935	c.Check(error, NotNil)
936
937	fmt.Println("TestBatchWriteRow finished")
938}
939
940func (s *TableStoreSuite) TestGetRange(c *C) {
941	fmt.Println("TestGetRange started")
942	rowCount := 9
943	timeNow := time.Now().Unix() * 1000
944	for i := 0; i < rowCount; i++ {
945		key := "getrange" + strconv.Itoa(i)
946		value := "value" + strconv.Itoa(i)
947		PrepareDataInDefaultTableWithTimestamp(key, value, timeNow)
948	}
949
950	getRangeRequest := &GetRangeRequest{}
951	rangeRowQueryCriteria := &RangeRowQueryCriteria{}
952	rangeRowQueryCriteria.TableName = defaultTableName
953	start := 1
954	end := 8
955	startPK := new(PrimaryKey)
956	startPK.AddPrimaryKeyColumn("pk1", "getrange"+strconv.Itoa(start))
957	endPK := new(PrimaryKey)
958	endPK.AddPrimaryKeyColumn("pk1", "getrange"+strconv.Itoa(end))
959	rangeRowQueryCriteria.StartPrimaryKey = startPK
960	rangeRowQueryCriteria.EndPrimaryKey = endPK
961	rangeRowQueryCriteria.Direction = FORWARD
962	rangeRowQueryCriteria.MaxVersion = 1
963	rangeRowQueryCriteria.ColumnsToGet = []string{"col1"}
964	getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
965
966	fmt.Println("check", rangeRowQueryCriteria.ColumnsToGet)
967	fmt.Println("check2", getRangeRequest.RangeRowQueryCriteria.ColumnsToGet)
968	getRangeResp, error := client.GetRange(getRangeRequest)
969	c.Check(error, Equals, nil)
970	c.Check(getRangeResp.Rows, NotNil)
971	count := end - start
972	c.Check(len(getRangeResp.Rows), Equals, count)
973	c.Check(len(getRangeResp.Rows[0].Columns), Equals, 1)
974	c.Check(getRangeResp.NextStartPrimaryKey, IsNil)
975
976	getRangeRequest = &GetRangeRequest{}
977	rangeRowQueryCriteria = &RangeRowQueryCriteria{}
978	rangeRowQueryCriteria.TableName = defaultTableName
979
980	rangeRowQueryCriteria.StartPrimaryKey = endPK
981	rangeRowQueryCriteria.EndPrimaryKey = startPK
982	rangeRowQueryCriteria.Direction = BACKWARD
983	rangeRowQueryCriteria.MaxVersion = 1
984	getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
985	getRangeResp, error = client.GetRange(getRangeRequest)
986	c.Check(error, Equals, nil)
987	c.Check(getRangeResp.Rows, NotNil)
988
989	fmt.Println("use time range to query rows")
990
991	rangeRowQueryCriteria.TimeRange = &TimeRange{Specific: timeNow - 100001}
992	getRangeResp, error = client.GetRange(getRangeRequest)
993	c.Check(error, NotNil)
994	fmt.Println(error)
995
996	fmt.Println("use time range to query rows 2")
997	rangeRowQueryCriteria.TimeRange = &TimeRange{Start: timeNow + 1, End: timeNow + 2}
998	getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
999	getRangeResp2, error := client.GetRange(getRangeRequest)
1000
1001	c.Check(error, Equals, nil)
1002	c.Check(getRangeResp2.Rows, NotNil)
1003	c.Check(len(getRangeResp2.Rows), Equals, count)
1004	c.Check(len(getRangeResp2.Rows[0].Columns), Equals, 0)
1005
1006	_, error = invalidClient.GetRange(getRangeRequest)
1007	c.Check(error, NotNil)
1008	fmt.Println("TestGetRange finished")
1009}
1010
1011func (s *TableStoreSuite) TestGetRangeWithPagination(c *C) {
1012	fmt.Println("TestGetRangeWithPagination started")
1013	rowCount := 9
1014	for i := 0; i < rowCount; i++ {
1015		key := "testrangequery" + strconv.Itoa(i)
1016		value := "value" + strconv.Itoa(i)
1017		PrepareDataInDefaultTable(key, value)
1018	}
1019
1020	getRangeRequest := &GetRangeRequest{}
1021	rangeRowQueryCriteria := &RangeRowQueryCriteria{}
1022	rangeRowQueryCriteria.TableName = defaultTableName
1023	start := 1
1024	end := 8
1025	var limit int32 = 3
1026	startPK := new(PrimaryKey)
1027	startPK.AddPrimaryKeyColumn("pk1", "testrangequery"+strconv.Itoa(start))
1028	endPK := new(PrimaryKey)
1029	endPK.AddPrimaryKeyColumn("pk1", "testrangequery"+strconv.Itoa(end))
1030	rangeRowQueryCriteria.StartPrimaryKey = startPK
1031	rangeRowQueryCriteria.EndPrimaryKey = endPK
1032	rangeRowQueryCriteria.Direction = FORWARD
1033	rangeRowQueryCriteria.MaxVersion = 1
1034	rangeRowQueryCriteria.Limit = limit
1035	getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
1036
1037	getRangeResp, error := client.GetRange(getRangeRequest)
1038
1039	c.Check(error, Equals, nil)
1040	c.Check(getRangeResp.Rows, NotNil)
1041
1042	c.Check(len(getRangeResp.Rows), Equals, int(limit))
1043	c.Check(getRangeResp.NextStartPrimaryKey, NotNil)
1044	fmt.Println("TestGetRangeWithPagination finished")
1045}
1046
1047func (s *TableStoreSuite) TestGetRangeWithFilter(c *C) {
1048	fmt.Println("TestGetRange started")
1049	rowCount := 20
1050	timeNow := time.Now().Unix() * 1000
1051	for i := 0; i < rowCount; i++ {
1052		key := "zgetrangetest" + strconv.Itoa(i)
1053		value := "value" + strconv.Itoa(i)
1054		PrepareDataInRangeTableWithTimestamp("pk1", key, value, timeNow)
1055	}
1056
1057	for i := 0; i < rowCount; i++ {
1058		key := "zgetrangetest2" + strconv.Itoa(i)
1059		value := "value" + strconv.Itoa(i)
1060		PrepareDataInRangeTableWithTimestamp("pk2", key, value, timeNow)
1061	}
1062
1063	for i := 0; i < rowCount; i++ {
1064		key := "zgetrangetest3" + strconv.Itoa(i)
1065		value := "value" + strconv.Itoa(i)
1066		PrepareDataInRangeTableWithTimestamp("pk3", key, value, timeNow)
1067	}
1068
1069	getRangeRequest := &GetRangeRequest{}
1070	rangeRowQueryCriteria := &RangeRowQueryCriteria{}
1071	rangeRowQueryCriteria.TableName = rangeQueryTableName
1072
1073	startPK := new(PrimaryKey)
1074	startPK.AddPrimaryKeyColumnWithMinValue("pk1")
1075	startPK.AddPrimaryKeyColumnWithMinValue("pk2")
1076	endPK := new(PrimaryKey)
1077	endPK.AddPrimaryKeyColumnWithMaxValue("pk1")
1078	endPK.AddPrimaryKeyColumnWithMaxValue("pk2")
1079	rangeRowQueryCriteria.StartPrimaryKey = startPK
1080	rangeRowQueryCriteria.EndPrimaryKey = endPK
1081	rangeRowQueryCriteria.Direction = FORWARD
1082	rangeRowQueryCriteria.MaxVersion = 1
1083	filter := NewCompositeColumnCondition(LogicalOperator(LO_AND))
1084	filter1 := NewSingleColumnCondition("pk2", ComparatorType(CT_GREATER_EQUAL), "pk3")
1085	filter2 := NewSingleColumnCondition("pk2", ComparatorType(CT_LESS_EQUAL), "pk3")
1086	filter.AddFilter(filter2)
1087	filter.AddFilter(filter1)
1088	rangeRowQueryCriteria.Filter = filter
1089	getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
1090
1091	getRangeResp, error := client.GetRange(getRangeRequest)
1092	c.Check(error, Equals, nil)
1093	fmt.Println(getRangeResp)
1094	fmt.Println(getRangeResp.NextStartPrimaryKey)
1095	fmt.Println(getRangeResp.Rows)
1096	//fmt.Println(getRangeResp.NextStartPrimaryKey)
1097	//c.Check(getRangeResp.Rows, NotNil)
1098
1099	fmt.Println("TestGetRange with filter finished")
1100}
1101
1102func (s *TableStoreSuite) TestGetRangeWithMinMaxValue(c *C) {
1103	fmt.Println("TestGetRangeWithMinMaxValue started")
1104
1105	getRangeRequest := &GetRangeRequest{}
1106	rangeRowQueryCriteria := &RangeRowQueryCriteria{}
1107	rangeRowQueryCriteria.TableName = defaultTableName
1108
1109	var limit int32 = 8
1110	startPK := new(PrimaryKey)
1111	startPK.AddPrimaryKeyColumnWithMinValue("pk1")
1112	endPK := new(PrimaryKey)
1113	endPK.AddPrimaryKeyColumnWithMaxValue("pk1")
1114	rangeRowQueryCriteria.StartPrimaryKey = startPK
1115	rangeRowQueryCriteria.EndPrimaryKey = endPK
1116	rangeRowQueryCriteria.Direction = FORWARD
1117	rangeRowQueryCriteria.MaxVersion = 1
1118	rangeRowQueryCriteria.Limit = limit
1119	getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria
1120
1121	getRangeResp, error := client.GetRange(getRangeRequest)
1122
1123	c.Check(error, Equals, nil)
1124	c.Check(getRangeResp.Rows, NotNil)
1125
1126	c.Check(len(getRangeResp.Rows), Equals, int(limit))
1127	c.Check(getRangeResp.NextStartPrimaryKey, NotNil)
1128	fmt.Println("TestGetRangeWithMinMaxValue finished")
1129}
1130
1131func (s *TableStoreSuite) TestPutRowsWorkload(c *C) {
1132	fmt.Println("TestPutRowsWorkload started")
1133
1134	start := time.Now().UnixNano()
1135
1136	isFinished := make(chan bool)
1137	totalCount := 100
1138	for i := 0; i < totalCount; i++ {
1139		value := i * 10000
1140		go func(index int) {
1141			for j := 0; j < 100; j++ {
1142				currentIndex := index + j
1143				rowToPut1 := CreatePutRowChange("workloadtestkey"+strconv.Itoa(currentIndex), "perfdata1")
1144				putRowRequest := new(PutRowRequest)
1145				putRowRequest.PutRowChange = rowToPut1
1146				_, error := client.PutRow(putRowRequest)
1147				if error != nil {
1148					fmt.Println("put row error", error)
1149				}
1150				c.Check(error, IsNil)
1151			}
1152
1153			isFinished <- true
1154		}(value)
1155	}
1156
1157	/*go func(){
1158		time.Sleep(time.Millisecond * 1000 * 10)
1159		close(isFinished)
1160	}()*/
1161
1162	count := 0
1163	for _ = range isFinished {
1164		count++
1165		fmt.Println("catched count is:", count)
1166		if count >= totalCount {
1167			close(isFinished)
1168		}
1169	}
1170	c.Check(count, Equals, totalCount)
1171	end := time.Now().UnixNano()
1172
1173	totalCost := (end - start) / 1000000
1174	fmt.Println("total cost:", totalCost)
1175	c.Check(totalCost < 30*1000, Equals, true)
1176
1177	time.Sleep(time.Millisecond * 20)
1178	fmt.Println("TestPutRowsWorkload finished")
1179}
1180
1181func (s *TableStoreSuite) TestFailureCase(c *C) {
1182	tableName := randStringRunes(200)
1183	createtableRequest := new(CreateTableRequest)
1184	tableMeta := new(TableMeta)
1185	tableMeta.TableName = tableName
1186	tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING)
1187	tableOption := new(TableOption)
1188	tableOption.TimeToAlive = -1
1189	tableOption.MaxVersion = 3
1190	reservedThroughput := new(ReservedThroughput)
1191	reservedThroughput.Readcap = 0
1192	reservedThroughput.Writecap = 0
1193	createtableRequest.TableMeta = tableMeta
1194	createtableRequest.TableOption = tableOption
1195	createtableRequest.ReservedThroughput = reservedThroughput
1196	_, error := client.CreateTable(createtableRequest)
1197	c.Check(error, NotNil)
1198	c.Check(error.Error(), Equals, errTableNameTooLong(tableName).Error())
1199
1200	createtableRequest = new(CreateTableRequest)
1201	tableMeta = new(TableMeta)
1202	tableMeta.TableName = tableNamePrefix + "pktomuch"
1203
1204	tableOption = new(TableOption)
1205	tableOption.TimeToAlive = -1
1206	tableOption.MaxVersion = 3
1207	reservedThroughput = new(ReservedThroughput)
1208	reservedThroughput.Readcap = 0
1209	reservedThroughput.Writecap = 0
1210	createtableRequest.TableMeta = tableMeta
1211	createtableRequest.TableOption = tableOption
1212	createtableRequest.ReservedThroughput = reservedThroughput
1213	_, error = client.CreateTable(createtableRequest)
1214	c.Check(error, NotNil)
1215	c.Check(error.Error(), Equals, errCreateTableNoPrimaryKey.Error())
1216
1217	tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING)
1218	tableMeta.AddPrimaryKeyColumn("pk2", PrimaryKeyType_STRING)
1219	tableMeta.AddPrimaryKeyColumn("pk3", PrimaryKeyType_STRING)
1220	tableMeta.AddPrimaryKeyColumn("pk4", PrimaryKeyType_STRING)
1221	tableMeta.AddPrimaryKeyColumn("pk5", PrimaryKeyType_STRING)
1222
1223	_, error = client.CreateTable(createtableRequest)
1224	c.Check(error, NotNil)
1225	c.Check(error.Error(), Equals, errPrimaryKeyTooMuch.Error())
1226
1227	request := &PutRowRequest{}
1228	_, error = client.PutRow(request)
1229	c.Check(error, IsNil)
1230
1231	_, error = client.PutRow(nil)
1232	c.Check(error, IsNil)
1233
1234	_, err := invalidClient.ListTable()
1235	c.Check(err, NotNil)
1236
1237	tableName = tableNamePrefix + "tablenotexist"
1238	deleteReq := new(DeleteTableRequest)
1239	deleteReq.TableName = tableName
1240	_, err = client.DeleteTable(deleteReq)
1241	c.Check(err, NotNil)
1242
1243	_, err = invalidClient.ListTable()
1244	c.Check(err, NotNil)
1245
1246	updateTableReq := new(UpdateTableRequest)
1247	updateTableReq.TableName = defaultTableName
1248	updateTableReq.TableOption = new(TableOption)
1249	updateTableReq.TableOption.TimeToAlive = -1
1250	updateTableReq.TableOption.MaxVersion = 5
1251	updateTableReq.ReservedThroughput = &ReservedThroughput{}
1252	updateTableReq.ReservedThroughput.Readcap = 0
1253
1254	_, error = invalidClient.UpdateTable(updateTableReq)
1255	c.Assert(error, NotNil)
1256
1257	describeTableReq := new(DescribeTableRequest)
1258	describeTableReq.TableName = defaultTableName
1259	_, error = invalidClient.DescribeTable(describeTableReq)
1260	c.Assert(error, NotNil)
1261}
1262
1263func (s *TableStoreSuite) TestMockHttpClientCase(c *C) {
1264	fmt.Println("TestMockHttpClientCase started")
1265	currentGetHttpClientFunc = func() IHttpClient {
1266		return &mockHttpClient{}
1267	}
1268
1269	tempClient := NewClientWithConfig("test", "a", "b", "c", "d", NewDefaultTableStoreConfig())
1270	putRowRequest := new(PutRowRequest)
1271	putRowChange := new(PutRowChange)
1272	putRowChange.TableName = defaultTableName
1273	putPk := new(PrimaryKey)
1274	putPk.AddPrimaryKeyColumn("pk1", "mockkey1")
1275	putRowChange.PrimaryKey = putPk
1276	putRowChange.AddColumn("col1", "col1data1")
1277	putRowChange.AddColumn("col2", int64(100))
1278	putRowChange.AddColumn("col3", float64(2.1))
1279	putRowChange.SetCondition(RowExistenceExpectation_EXPECT_NOT_EXIST)
1280	putRowRequest.PutRowChange = putRowChange
1281	data := tempClient.httpClient.(*mockHttpClient)
1282
1283	data.error = fmt.Errorf("test")
1284	_, error := tempClient.PutRow(putRowRequest)
1285	c.Check(error, Equals, data.error)
1286
1287	data.response = &http.Response{}
1288	_, error = tempClient.PutRow(putRowRequest)
1289	c.Check(error, Equals, data.error)
1290
1291	/*data.error = nil
1292	_, error = tempClient.PutRow(putRowRequest)
1293	c.Check(error, Equals, data.error)*/
1294
1295	currentGetHttpClientFunc = func() IHttpClient {
1296		return &TableStoreHttpClient{}
1297	}
1298
1299	fmt.Println("TestMockHttpClientCase finished")
1300}
1301
1302func (s *TableStoreSuite) TestUnit(c *C) {
1303	otshead := createOtsHeaders("test")
1304	otshead.set(xOtsApiversion, ApiVersion)
1305	_, error := otshead.signature(getRowUri, "POST", "test")
1306	c.Check(error, NotNil)
1307
1308	otshead.set(xOtsDate, "any")
1309	otshead.set(xOtsApiversion, "any")
1310	otshead.set(xOtsAccesskeyid, "any")
1311	otshead.set(xOtsContentmd5, "any")
1312	otshead.set(xOtsInstanceName, "any")
1313
1314	otshead.headers = nil
1315	otshead.set("abc", "def")
1316
1317	result := otshead.search("zz")
1318	c.Check(result, IsNil)
1319
1320	tempClient := NewClient("a", "b", "c", "d", SetSth())
1321	c.Check(tempClient, NotNil)
1322	config := NewDefaultTableStoreConfig()
1323	tempClient = NewClientWithConfig("a", "b", "c", "d", "e", config)
1324	c.Check(tempClient, NotNil)
1325
1326	errorCode := INTERNAL_SERVER_ERROR
1327	tsClient := client.(*TableStoreClient)
1328	value := getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 10, time.Now().Add(time.Second*1), 10, getRowUri)
1329	c.Check(value == 0, Equals, true)
1330
1331	errorCode = ROW_OPERATION_CONFLICT
1332	value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, getRowUri)
1333	c.Check(value > 0, Equals, true)
1334
1335	errorCode = STORAGE_TIMEOUT
1336	value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, putRowUri)
1337	c.Check(value == 0, Equals, true)
1338
1339	errorCode = STORAGE_TIMEOUT
1340	value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, getRowUri)
1341	c.Check(value > 0, Equals, true)
1342
1343	errorCode = STORAGE_TIMEOUT
1344	value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), MaxRetryInterval, getRowUri)
1345	c.Check(value == MaxRetryInterval, Equals, true)
1346
1347	// stream api
1348	errorCode = STORAGE_TIMEOUT
1349	value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, getStreamRecordUri)
1350	c.Check(value > 0, Equals, true)
1351
1352	// 502
1353	errorCode = SERVER_UNAVAILABLE
1354	value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: "bad gateway"}, 1, time.Now().Add(time.Second*1), 10, getStreamRecordUri)
1355	c.Check(value > 0, Equals, true)
1356
1357	// 502 write
1358	errorCode = SERVER_UNAVAILABLE
1359	value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: "bad gateway"}, 1, time.Now().Add(time.Second*1), 10, putRowUri)
1360	c.Check(value == 0, Equals, true)
1361
1362	// 400 normal
1363	errorCode = "OTSPermissionDenied"
1364	value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, putRowUri)
1365	c.Check(value == 0, Equals, true)
1366
1367	// 400 raw http
1368	errorCode = OTS_CLIENT_UNKNOWN
1369	value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, getRowUri)
1370	c.Check(value == 0, Equals, true)
1371
1372	// storage 503 put
1373	errorCode = STORAGE_SERVER_BUSY
1374	value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, putRowUri)
1375	c.Check(value > 0, Equals, true)
1376
1377	// storage 503 desc stream
1378	errorCode = STORAGE_SERVER_BUSY
1379	value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, describeStreamUri)
1380	c.Check(value > 0, Equals, true)
1381
1382	// EOF
1383	value = getNextPause(tsClient, io.EOF, 1, time.Now().Add(time.Second*1), 10, putRowUri)
1384	c.Check(value > 0, Equals, true)
1385
1386	// connection rest
1387	value = getNextPause(tsClient, syscall.ECONNRESET, 1, time.Now().Add(time.Second*1), 10, putRowUri)
1388	c.Check(value > 0, Equals, true)
1389
1390	getResp := &GetRowResponse{}
1391	colMap := getResp.GetColumnMap()
1392	c.Check(colMap, NotNil)
1393
1394	getResp = &GetRowResponse{}
1395	col1 := &AttributeColumn{ColumnName: "col1", Value: "value1"}
1396	col2 := &AttributeColumn{ColumnName: "col1", Value: "value2"}
1397	col3 := &AttributeColumn{ColumnName: "col2", Value: "value3"}
1398
1399	getResp.Columns = append(getResp.Columns, col1)
1400	getResp.Columns = append(getResp.Columns, col2)
1401	getResp.Columns = append(getResp.Columns, col3)
1402	colMap = getResp.GetColumnMap()
1403	c.Check(colMap, NotNil)
1404	cols := colMap.Columns["col1"]
1405	c.Check(cols, NotNil)
1406	c.Check(len(cols), Equals, 2)
1407
1408	cols2 := colMap.Columns["col2"]
1409	c.Check(cols2, NotNil)
1410	c.Check(len(cols2), Equals, 1)
1411
1412	cols3, _ := colMap.GetRange(1, 1)
1413
1414	c.Check(cols3, NotNil)
1415	c.Check(len(cols3), Equals, 1)
1416
1417	var resp2 *GetRowResponse
1418	resp2 = nil
1419	c.Check(resp2.GetColumnMap(), IsNil)
1420}
1421
1422func SetSth() ClientOption {
1423	return func(client *TableStoreClient) {
1424		fmt.Println(client.accessKeyId)
1425	}
1426}
1427
1428func CreatePutRowChange(pkValue, colValue string) *PutRowChange {
1429	putRowChange := new(PutRowChange)
1430	putRowChange.TableName = defaultTableName
1431	putPk := new(PrimaryKey)
1432	putPk.AddPrimaryKeyColumn("pk1", pkValue)
1433	putRowChange.PrimaryKey = putPk
1434	putRowChange.AddColumn("col1", colValue)
1435	putRowChange.SetCondition(RowExistenceExpectation_IGNORE)
1436	return putRowChange
1437}
1438
1439type mockHttpClient struct {
1440	response   *http.Response
1441	error      error
1442	httpClient *http.Client
1443}
1444
1445func (mockHttpClient *mockHttpClient) Do(req *http.Request) (*http.Response, error) {
1446	return mockHttpClient.response, mockHttpClient.error
1447}
1448
1449func (mockHttpClient *mockHttpClient) New(client *http.Client) {
1450	mockHttpClient.httpClient = client
1451}
1452
1453var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
1454
1455func randStringRunes(n int) string {
1456	random := rand.New(rand.NewSource(time.Now().Unix()))
1457
1458	b := make([]rune, n)
1459	for i := range b {
1460		b[i] = letterRunes[random.Intn(len(letterRunes))]
1461	}
1462	return string(b)
1463}
1464
1465func PrepareDataInDefaultTable(key string, value string) error {
1466	putRowRequest := new(PutRowRequest)
1467	putRowChange := new(PutRowChange)
1468	putRowChange.TableName = defaultTableName
1469	putPk := new(PrimaryKey)
1470	putPk.AddPrimaryKeyColumn("pk1", key)
1471	putRowChange.AddColumn("col1", value)
1472	putRowChange.PrimaryKey = putPk
1473	putRowChange.SetCondition(RowExistenceExpectation_IGNORE)
1474	putRowRequest.PutRowChange = putRowChange
1475	_, error := client.PutRow(putRowRequest)
1476	return error
1477}
1478
1479func PrepareDataInDefaultTableWithMultiAttribute(key string, value1 string, value2 string, value3 string) error {
1480	putRowRequest := new(PutRowRequest)
1481	putRowChange := new(PutRowChange)
1482	putRowChange.TableName = defaultTableName
1483	putPk := new(PrimaryKey)
1484	putPk.AddPrimaryKeyColumn("pk1", key)
1485	putRowChange.AddColumn("col1", value1)
1486	putRowChange.AddColumn("col2", value2)
1487	putRowChange.AddColumn("col3", value3)
1488	putRowChange.PrimaryKey = putPk
1489	putRowChange.SetCondition(RowExistenceExpectation_IGNORE)
1490	putRowRequest.PutRowChange = putRowChange
1491	_, error := client.PutRow(putRowRequest)
1492	return error
1493}
1494
1495func PrepareDataInDefaultTableWithTimestamp(key string, value string, timeNow int64) error {
1496	putRowRequest := new(PutRowRequest)
1497	putRowChange := new(PutRowChange)
1498	putRowChange.TableName = defaultTableName
1499	putPk := new(PrimaryKey)
1500	putPk.AddPrimaryKeyColumn("pk1", key)
1501	putRowChange.PrimaryKey = putPk
1502	putRowChange.AddColumnWithTimestamp("col1", value, timeNow)
1503	putRowChange.AddColumnWithTimestamp("col2", value, timeNow)
1504	putRowChange.AddColumnWithTimestamp("col3", value, timeNow)
1505	putRowChange.SetCondition(RowExistenceExpectation_IGNORE)
1506	putRowRequest.PutRowChange = putRowChange
1507	_, error := client.PutRow(putRowRequest)
1508	return error
1509}
1510
1511func PrepareDataInRangeTableWithTimestamp(key1 string, key2 string, value string, timeNow int64) error {
1512	putRowRequest := new(PutRowRequest)
1513	putRowChange := new(PutRowChange)
1514	putRowChange.TableName = rangeQueryTableName
1515	putPk := new(PrimaryKey)
1516	putPk.AddPrimaryKeyColumn("pk1", key1)
1517	putPk.AddPrimaryKeyColumn("pk2", key2)
1518	putRowChange.PrimaryKey = putPk
1519	putRowChange.AddColumnWithTimestamp("col1", value, timeNow)
1520	putRowChange.SetCondition(RowExistenceExpectation_IGNORE)
1521	putRowRequest.PutRowChange = putRowChange
1522	_, error := client.PutRow(putRowRequest)
1523	return error
1524}
1525
1526func (s *TableStoreSuite) TestListStream(c *C) {
1527	tableName := defaultTableName + "_ListStream"
1528	fmt.Printf("TestListStream starts on table %s\n", tableName)
1529	{
1530		err := PrepareTable(tableName)
1531		c.Assert(err, IsNil)
1532	}
1533	defer client.DeleteTable(&DeleteTableRequest{TableName: tableName})
1534	{
1535		resp, err := client.DescribeTable(&DescribeTableRequest{TableName: tableName})
1536		c.Assert(err, IsNil)
1537		c.Assert(resp.StreamDetails, NotNil)
1538		c.Assert(resp.StreamDetails.EnableStream, Equals, false)
1539		c.Assert(resp.StreamDetails.StreamId, IsNil)
1540		c.Assert(resp.StreamDetails.ExpirationTime, Equals, int32(0))
1541		c.Assert(resp.StreamDetails.LastEnableTime, Equals, int64(0))
1542	}
1543	{
1544		resp, err := client.ListStream(&ListStreamRequest{TableName: &tableName})
1545		c.Assert(err, IsNil)
1546		fmt.Printf("%v\n", resp)
1547		c.Assert(len(resp.Streams), Equals, 0)
1548	}
1549	{
1550		resp, err := client.UpdateTable(&UpdateTableRequest{
1551			TableName:  tableName,
1552			StreamSpec: &StreamSpecification{EnableStream: true, ExpirationTime: 24}})
1553		c.Assert(err, IsNil)
1554		c.Assert(resp.StreamDetails, NotNil)
1555	}
1556	{
1557		resp, err := client.ListStream(&ListStreamRequest{TableName: &tableName})
1558		c.Assert(err, IsNil)
1559		fmt.Printf("%#v\n", resp)
1560		c.Assert(len(resp.Streams), Equals, 1)
1561	}
1562	{
1563		resp, err := client.DescribeTable(&DescribeTableRequest{TableName: tableName})
1564		c.Assert(err, IsNil)
1565		c.Assert(resp.StreamDetails, NotNil)
1566		fmt.Printf("%#v\n", resp)
1567		c.Assert(resp.StreamDetails.EnableStream, Equals, true)
1568		c.Assert(resp.StreamDetails.StreamId, NotNil)
1569		c.Assert(resp.StreamDetails.ExpirationTime, Equals, int32(24))
1570		c.Assert(resp.StreamDetails.LastEnableTime > 0, Equals, true)
1571	}
1572	fmt.Println("TestListStream finish")
1573}
1574
1575func (s *TableStoreSuite) TestCreateTableWithStream(c *C) {
1576	tableName := defaultTableName + "_CreateTableWithStream"
1577	fmt.Printf("TestCreateTableWithStream starts on table %s\n", tableName)
1578	{
1579		req := CreateTableRequest{}
1580		tableMeta := TableMeta{}
1581		tableMeta.TableName = tableName
1582		tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING)
1583		req.TableMeta = &tableMeta
1584
1585		tableOption := TableOption{}
1586		tableOption.TimeToAlive = -1
1587		tableOption.MaxVersion = 3
1588		req.TableOption = &tableOption
1589
1590		req.ReservedThroughput = &ReservedThroughput{Readcap: 0, Writecap: 0}
1591
1592		req.StreamSpec = &StreamSpecification{EnableStream: true, ExpirationTime: 24}
1593
1594		_, err := client.CreateTable(&req)
1595		c.Assert(err, IsNil)
1596	}
1597	defer client.DeleteTable(&DeleteTableRequest{TableName: tableName})
1598	{
1599		resp, err := client.ListStream(&ListStreamRequest{TableName: &tableName})
1600		c.Assert(err, IsNil)
1601		fmt.Printf("%#v\n", resp)
1602		c.Assert(len(resp.Streams), Equals, 1)
1603	}
1604	fmt.Println("TestCreateTableWithStream finish")
1605}
1606
1607func (s *TableStoreSuite) TestStream(c *C) {
1608	tableName := defaultTableName + "_Stream"
1609	fmt.Printf("TestCreateTableWithStream starts on table %s\n", tableName)
1610	{
1611		req := CreateTableRequest{}
1612		tableMeta := TableMeta{}
1613		tableMeta.TableName = tableName
1614		tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING)
1615		req.TableMeta = &tableMeta
1616
1617		tableOption := TableOption{}
1618		tableOption.TimeToAlive = -1
1619		tableOption.MaxVersion = 3
1620		req.TableOption = &tableOption
1621
1622		req.ReservedThroughput = &ReservedThroughput{Readcap: 0, Writecap: 0}
1623
1624		req.StreamSpec = &StreamSpecification{EnableStream: true, ExpirationTime: 24}
1625
1626		_, err := client.CreateTable(&req)
1627		c.Assert(err, IsNil)
1628	}
1629	defer client.DeleteTable(&DeleteTableRequest{TableName: tableName})
1630	var streamId *StreamId
1631	{
1632		resp, err := client.ListStream(&ListStreamRequest{TableName: &tableName})
1633		c.Assert(err, IsNil)
1634		fmt.Printf("%#v\n", resp)
1635		c.Assert(len(resp.Streams), Equals, 1)
1636		streamId = resp.Streams[0].Id
1637	}
1638	c.Assert(streamId, NotNil)
1639	var shardId *ShardId
1640	for {
1641		resp, err := client.DescribeStream(&DescribeStreamRequest{StreamId: streamId})
1642		c.Assert(err, IsNil)
1643		fmt.Printf("DescribeStreamResponse: %#v\n", resp)
1644		c.Assert(*resp.StreamId, Equals, *streamId)
1645		c.Assert(resp.ExpirationTime, Equals, int32(24))
1646		c.Assert(*resp.TableName, Equals, tableName)
1647		c.Assert(len(resp.Shards), Equals, 1)
1648		fmt.Printf("StreamShard: %#v\n", resp.Shards[0])
1649		shardId = resp.Shards[0].SelfShard
1650		if resp.Status == SS_Active {
1651			break
1652		}
1653	}
1654	c.Assert(shardId, NotNil)
1655	var iter *ShardIterator
1656	var records []*StreamRecord
1657	{
1658		resp, err := client.GetShardIterator(&GetShardIteratorRequest{
1659			StreamId: streamId,
1660			ShardId:  shardId})
1661		c.Assert(err, IsNil)
1662		c.Assert(resp.ShardIterator, NotNil)
1663		iter = resp.ShardIterator
1664	}
1665	fmt.Printf("init iterator: %#v\n", *iter)
1666	iter, _ = exhaustStreamRecords(c, iter)
1667	fmt.Printf("put row:\n")
1668	{
1669		req := PutRowRequest{}
1670		rowChange := PutRowChange{}
1671		rowChange.TableName = tableName
1672		pk := PrimaryKey{}
1673		pk.AddPrimaryKeyColumn("pk1", "rowkey")
1674		rowChange.PrimaryKey = &pk
1675		rowChange.AddColumn("colToDel", "abc")
1676		rowChange.AddColumn("colToDelAll", true)
1677		rowChange.AddColumn("colToUpdate", int64(123))
1678		rowChange.SetCondition(RowExistenceExpectation_IGNORE)
1679		req.PutRowChange = &rowChange
1680		_, err := client.PutRow(&req)
1681		c.Assert(err, IsNil)
1682	}
1683	iter, records = exhaustStreamRecords(c, iter)
1684	var timestamp int64
1685	{
1686		c.Assert(len(records), Equals, 1)
1687		r := records[0]
1688		c.Assert(r.Type, Equals, AT_Put)
1689		c.Assert(r.Info, NotNil)
1690		c.Assert(r.PrimaryKey, NotNil)
1691
1692		pkey := r.PrimaryKey
1693		c.Assert(len(pkey.PrimaryKeys), Equals, 1)
1694		pkc := pkey.PrimaryKeys[0]
1695		c.Assert(pkc, NotNil)
1696		c.Assert(pkc.ColumnName, Equals, "pk1")
1697		c.Assert(pkc.Value, Equals, "rowkey")
1698		c.Assert(pkc.PrimaryKeyOption, Equals, NONE)
1699
1700		c.Assert(len(r.Columns), Equals, 3)
1701		attr0 := r.Columns[0]
1702		attr1 := r.Columns[1]
1703		attr2 := r.Columns[2]
1704		c.Assert(attr0, NotNil)
1705		c.Assert(*attr0.Name, Equals, "colToDel")
1706		c.Assert(attr0.Type, Equals, RCT_Put)
1707		c.Assert(attr0.Value, Equals, "abc")
1708		c.Assert(attr1, NotNil)
1709		c.Assert(*attr1.Name, Equals, "colToDelAll")
1710		c.Assert(attr1.Type, Equals, RCT_Put)
1711		c.Assert(attr1.Value, Equals, true)
1712		timestamp = *attr0.Timestamp
1713		c.Assert(attr2, NotNil)
1714		c.Assert(*attr2.Name, Equals, "colToUpdate")
1715		c.Assert(attr2.Type, Equals, RCT_Put)
1716		c.Assert(attr2.Value, Equals, int64(123))
1717	}
1718	{
1719		chg := UpdateRowChange{}
1720		chg.TableName = tableName
1721		pk := PrimaryKey{}
1722		pk.AddPrimaryKeyColumn("pk1", "rowkey")
1723		chg.PrimaryKey = &pk
1724		chg.SetCondition(RowExistenceExpectation_IGNORE)
1725		chg.DeleteColumnWithTimestamp("colToDel", timestamp)
1726		chg.DeleteColumn("colToDelAll")
1727		chg.PutColumn("colToUpdate", 3.14)
1728		_, err := client.UpdateRow(&UpdateRowRequest{UpdateRowChange: &chg})
1729		c.Assert(err, IsNil)
1730	}
1731	iter, records = exhaustStreamRecords(c, iter)
1732	{
1733		c.Assert(len(records), Equals, 1)
1734		r := records[0]
1735		c.Assert(r.Type, Equals, AT_Update)
1736		c.Assert(r.Info, NotNil)
1737		c.Assert(r.PrimaryKey, NotNil)
1738
1739		pkey := r.PrimaryKey
1740		c.Assert(len(pkey.PrimaryKeys), Equals, 1)
1741		pkc := pkey.PrimaryKeys[0]
1742		c.Assert(pkc, NotNil)
1743		c.Assert(pkc.ColumnName, Equals, "pk1")
1744		c.Assert(pkc.Value, Equals, "rowkey")
1745		c.Assert(pkc.PrimaryKeyOption, Equals, NONE)
1746
1747		c.Assert(len(r.Columns), Equals, 3)
1748		attr0 := r.Columns[0]
1749		attr1 := r.Columns[1]
1750		attr2 := r.Columns[2]
1751		c.Assert(attr0, NotNil)
1752		c.Assert(*attr0.Name, Equals, "colToDel")
1753		c.Assert(attr0.Type, Equals, RCT_DeleteOneVersion)
1754		c.Assert(attr0.Value, IsNil)
1755		c.Assert(attr0.Timestamp, NotNil)
1756		c.Assert(*attr0.Timestamp, Equals, timestamp)
1757		c.Assert(attr1, NotNil)
1758		c.Assert(*attr1.Name, Equals, "colToDelAll")
1759		c.Assert(attr1.Type, Equals, RCT_DeleteAllVersions)
1760		c.Assert(attr1.Value, IsNil)
1761		c.Assert(attr1.Timestamp, IsNil)
1762		c.Assert(attr2, NotNil)
1763		c.Assert(*attr2.Name, Equals, "colToUpdate")
1764		c.Assert(attr2.Type, Equals, RCT_Put)
1765		c.Assert(attr2.Value, Equals, 3.14)
1766	}
1767	{
1768		chg := DeleteRowChange{}
1769		chg.TableName = tableName
1770		pk := PrimaryKey{}
1771		pk.AddPrimaryKeyColumn("pk1", "rowkey")
1772		chg.PrimaryKey = &pk
1773		chg.SetCondition(RowExistenceExpectation_IGNORE)
1774		_, err := client.DeleteRow(&DeleteRowRequest{DeleteRowChange: &chg})
1775		c.Assert(err, IsNil)
1776	}
1777	iter, records = exhaustStreamRecords(c, iter)
1778	{
1779		c.Assert(len(records), Equals, 1)
1780		r := records[0]
1781		c.Assert(r.Type, Equals, AT_Delete)
1782		c.Assert(r.Info, NotNil)
1783		c.Assert(r.PrimaryKey, NotNil)
1784
1785		pkey := r.PrimaryKey
1786		c.Assert(len(pkey.PrimaryKeys), Equals, 1)
1787		pkc := pkey.PrimaryKeys[0]
1788		c.Assert(pkc, NotNil)
1789		c.Assert(pkc.ColumnName, Equals, "pk1")
1790		c.Assert(pkc.Value, Equals, "rowkey")
1791		c.Assert(pkc.PrimaryKeyOption, Equals, NONE)
1792
1793		c.Assert(len(r.Columns), Equals, 0)
1794	}
1795	fmt.Println("TestCreateTableWithStream finish")
1796}
1797
1798func exhaustStreamRecords(c *C, iter *ShardIterator) (*ShardIterator, []*StreamRecord) {
1799	records := make([]*StreamRecord, 0)
1800	for {
1801		resp, err := client.GetStreamRecord(&GetStreamRecordRequest{
1802			ShardIterator: iter})
1803		c.Assert(err, IsNil)
1804		fmt.Printf("#records: %d\n", len(resp.Records))
1805		for i, rec := range resp.Records {
1806			fmt.Printf("record %d: %s\n", i, rec)
1807		}
1808		for _, rec := range resp.Records {
1809			records = append(records, rec)
1810		}
1811		nextIter := resp.NextShardIterator
1812		if nextIter == nil {
1813			fmt.Printf("next iterator: %#v\n", nextIter)
1814			break
1815		} else {
1816			fmt.Printf("next iterator: %#v\n", *nextIter)
1817		}
1818		if *iter == *nextIter {
1819			break
1820		}
1821		iter = nextIter
1822	}
1823	return iter, records
1824}
1825