1package tablestore 2 3import ( 4 "fmt" 5 . "gopkg.in/check.v1" 6 "math/rand" 7 "net/http" 8 "os" 9 "runtime" 10 "strconv" 11 "strings" 12 "testing" 13 "time" 14 "io" 15 "syscall" 16) 17 18// Hook up gocheck into the "go test" runner. 19func Test(t *testing.T) { 20 TestingT(t) 21} 22 23type TableStoreSuite struct{} 24 25var tableNamePrefix string 26 27var _ = Suite(&TableStoreSuite{}) 28 29var defaultTableName = "defaulttable" 30var rangeQueryTableName = "rangetable" 31 32// Todo: use config 33var client TableStoreApi 34var invalidClient TableStoreApi 35 36func (s *TableStoreSuite) SetUpSuite(c *C) { 37 38 endpoint := os.Getenv("OTS_TEST_ENDPOINT") 39 instanceName := os.Getenv("OTS_TEST_INSTANCENAME") 40 accessKeyId := os.Getenv("OTS_TEST_KEYID") 41 accessKeySecret := os.Getenv("OTS_TEST_SECRET") 42 client = NewClient(endpoint, instanceName, accessKeyId, accessKeySecret) 43 44 tableNamePrefix = strings.Replace(runtime.Version(), ".", "", -1) 45 defaultTableName = tableNamePrefix + defaultTableName 46 rangeQueryTableName = tableNamePrefix + rangeQueryTableName 47 PrepareTable(defaultTableName) 48 PrepareTable2(rangeQueryTableName) 49 invalidClient = NewClient(endpoint, instanceName, accessKeyId, "invalidsecret") 50} 51 52func PrepareTable(tableName string) error { 53 createtableRequest := new(CreateTableRequest) 54 tableMeta := new(TableMeta) 55 tableMeta.TableName = tableName 56 tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING) 57 tableOption := new(TableOption) 58 tableOption.TimeToAlive = -1 59 tableOption.MaxVersion = 3 60 reservedThroughput := new(ReservedThroughput) 61 reservedThroughput.Readcap = 0 62 reservedThroughput.Writecap = 0 63 createtableRequest.TableMeta = tableMeta 64 createtableRequest.TableOption = tableOption 65 createtableRequest.ReservedThroughput = reservedThroughput 66 _, error := client.CreateTable(createtableRequest) 67 return error 68} 69 70func PrepareTable2(tableName string) error { 71 createtableRequest := new(CreateTableRequest) 72 tableMeta := new(TableMeta) 73 tableMeta.TableName = tableName 74 tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING) 75 tableMeta.AddPrimaryKeyColumn("pk2", PrimaryKeyType_STRING) 76 tableOption := new(TableOption) 77 tableOption.TimeToAlive = -1 78 tableOption.MaxVersion = 3 79 reservedThroughput := new(ReservedThroughput) 80 reservedThroughput.Readcap = 0 81 reservedThroughput.Writecap = 0 82 createtableRequest.TableMeta = tableMeta 83 createtableRequest.TableOption = tableOption 84 createtableRequest.ReservedThroughput = reservedThroughput 85 _, error := client.CreateTable(createtableRequest) 86 return error 87} 88 89func (s *TableStoreSuite) TestCreateTable(c *C) { 90 fmt.Println("TestCreateTable finished") 91 92 tableName := tableNamePrefix + "testcreatetable1" 93 94 deleteReq := new(DeleteTableRequest) 95 deleteReq.TableName = tableName 96 client.DeleteTable(deleteReq) 97 98 createtableRequest := new(CreateTableRequest) 99 100 tableMeta := new(TableMeta) 101 tableMeta.TableName = tableName 102 tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING) 103 104 tableOption := new(TableOption) 105 106 tableOption.TimeToAlive = -1 107 tableOption.MaxVersion = 3 108 109 reservedThroughput := new(ReservedThroughput) 110 reservedThroughput.Readcap = 0 111 reservedThroughput.Writecap = 0 112 113 createtableRequest.TableMeta = tableMeta 114 createtableRequest.TableOption = tableOption 115 createtableRequest.ReservedThroughput = reservedThroughput 116 117 _, error := client.CreateTable(createtableRequest) 118 c.Check(error, Equals, nil) 119 120 fmt.Println("TestCreateTable finished") 121} 122 123func (s *TableStoreSuite) TestReCreateTableAndPutRow(c *C) { 124 fmt.Println("TestReCreateTableAndPutRow started") 125 126 tableName := tableNamePrefix + "testrecreatetable1" 127 128 deleteReq := new(DeleteTableRequest) 129 deleteReq.TableName = tableName 130 client.DeleteTable(deleteReq) 131 132 createtableRequest := new(CreateTableRequest) 133 134 tableMeta := new(TableMeta) 135 tableMeta.TableName = tableName 136 tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING) 137 138 tableOption := new(TableOption) 139 140 tableOption.TimeToAlive = -1 141 tableOption.MaxVersion = 3 142 143 reservedThroughput := new(ReservedThroughput) 144 reservedThroughput.Readcap = 0 145 reservedThroughput.Writecap = 0 146 147 createtableRequest.TableMeta = tableMeta 148 createtableRequest.TableOption = tableOption 149 createtableRequest.ReservedThroughput = reservedThroughput 150 151 _, error := client.CreateTable(createtableRequest) 152 c.Check(error, Equals, nil) 153 154 //time.Sleep(500 * time.Millisecond) 155 _, error = client.DeleteTable(deleteReq) 156 c.Check(error, Equals, nil) 157 158 _, error = client.CreateTable(createtableRequest) 159 c.Check(error, Equals, nil) 160 161 putRowRequest := new(PutRowRequest) 162 putRowChange := new(PutRowChange) 163 putRowChange.TableName = tableName 164 putPk := new(PrimaryKey) 165 putPk.AddPrimaryKeyColumn("pk1", "key1") 166 putRowChange.PrimaryKey = putPk 167 putRowChange.AddColumn("col1", "col1data1") 168 putRowChange.AddColumn("col2", int64(100)) 169 putRowChange.AddColumn("col3", float64(2.1)) 170 putRowChange.AddColumn("col4", true) 171 putRowChange.AddColumn("col5", int64(50)) 172 putRowChange.AddColumn("col6", int64(60)) 173 putRowChange.SetCondition(RowExistenceExpectation_IGNORE) 174 putRowRequest.PutRowChange = putRowChange 175 _, error = client.PutRow(putRowRequest) 176 c.Check(error, Equals, nil) 177 178 fmt.Println("TestReCreateTableAndPutRow finished") 179} 180 181func (s *TableStoreSuite) TestListTable(c *C) { 182 listtables, error := client.ListTable() 183 c.Check(error, Equals, nil) 184 defaultTableExist := false 185 for _, table := range listtables.TableNames { 186 fmt.Println(table) 187 if table == defaultTableName { 188 defaultTableExist = true 189 break 190 } 191 } 192 193 c.Check(defaultTableExist, Equals, true) 194} 195 196func (s *TableStoreSuite) TestUpdateAndDescribeTable(c *C) { 197 fmt.Println("TestUpdateAndDescribeTable started") 198 updateTableReq := new(UpdateTableRequest) 199 updateTableReq.TableName = defaultTableName 200 updateTableReq.TableOption = new(TableOption) 201 updateTableReq.TableOption.TimeToAlive = -1 202 updateTableReq.TableOption.MaxVersion = 5 203 204 updateTableResp, error := client.UpdateTable(updateTableReq) 205 c.Assert(error, Equals, nil) 206 c.Assert(updateTableResp, NotNil) 207 c.Assert(updateTableResp.TableOption.TimeToAlive, Equals, updateTableReq.TableOption.TimeToAlive) 208 c.Assert(updateTableResp.TableOption.MaxVersion, Equals, updateTableReq.TableOption.MaxVersion) 209 210 describeTableReq := new(DescribeTableRequest) 211 describeTableReq.TableName = defaultTableName 212 describ, error := client.DescribeTable(describeTableReq) 213 c.Assert(error, Equals, nil) 214 215 c.Assert(describ, NotNil) 216 c.Assert(describ.TableOption.TimeToAlive, Equals, updateTableReq.TableOption.TimeToAlive) 217 c.Assert(describ.TableOption.MaxVersion, Equals, updateTableReq.TableOption.MaxVersion) 218 fmt.Println("TestUpdateAndDescribeTable finished") 219} 220 221func (s *TableStoreSuite) TestTableWithKeyAutoIncrement(c *C) { 222 tableName := tableNamePrefix + "incrementtable" 223 createtableRequest := new(CreateTableRequest) 224 225 tableMeta := new(TableMeta) 226 tableMeta.TableName = tableName 227 tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING) 228 tableMeta.AddPrimaryKeyColumnOption("pk2", PrimaryKeyType_INTEGER, AUTO_INCREMENT) 229 230 tableOption := new(TableOption) 231 tableOption.TimeToAlive = -1 232 tableOption.MaxVersion = 3 233 234 reservedThroughput := new(ReservedThroughput) 235 reservedThroughput.Readcap = 0 236 reservedThroughput.Writecap = 0 237 238 createtableRequest.TableMeta = tableMeta 239 createtableRequest.TableOption = tableOption 240 createtableRequest.ReservedThroughput = reservedThroughput 241 242 client.CreateTable(createtableRequest) 243 rowCount := 100 244 for i := 0; i < rowCount; i++ { 245 putRowRequest := new(PutRowRequest) 246 putRowChange := new(PutRowChange) 247 putRowChange.TableName = tableName 248 putPk := new(PrimaryKey) 249 putPk.AddPrimaryKeyColumn("pk1", "key"+strconv.Itoa(i)) 250 putPk.AddPrimaryKeyColumnWithAutoIncrement("pk2") 251 putRowChange.PrimaryKey = putPk 252 putRowChange.AddColumn("col1", "col1data1") 253 putRowChange.AddColumn("col2", int64(100)) 254 putRowChange.SetCondition(RowExistenceExpectation_IGNORE) 255 putRowRequest.PutRowChange = putRowChange 256 putRowRequest.PutRowChange.SetReturnPk() 257 response, error := client.PutRow(putRowRequest) 258 c.Check(error, Equals, nil) 259 c.Check(len(response.PrimaryKey.PrimaryKeys), Equals, 2) 260 c.Check(response.PrimaryKey.PrimaryKeys[0].ColumnName, Equals, "pk1") 261 c.Check(response.PrimaryKey.PrimaryKeys[0].Value, Equals, "key"+strconv.Itoa(i)) 262 c.Check(response.PrimaryKey.PrimaryKeys[1].ColumnName, Equals, "pk2") 263 c.Check(response.PrimaryKey.PrimaryKeys[1].Value.(int64) > 0, Equals, true) 264 265 fmt.Println(response.PrimaryKey.PrimaryKeys[1].Value) 266 } 267 268 describeTableReq := new(DescribeTableRequest) 269 describeTableReq.TableName = tableName 270 _, error := client.DescribeTable(describeTableReq) 271 c.Check(error, IsNil) 272} 273 274func (s *TableStoreSuite) TestPutGetRow(c *C) { 275 fmt.Println("TestPutGetRow started") 276 putRowRequest := new(PutRowRequest) 277 putRowChange := new(PutRowChange) 278 putRowChange.TableName = defaultTableName 279 putPk := new(PrimaryKey) 280 putPk.AddPrimaryKeyColumn("pk1", "Key6") 281 putRowChange.PrimaryKey = putPk 282 putRowChange.AddColumn("col1", "col1data1") 283 putRowChange.AddColumn("col2", int64(100)) 284 putRowChange.AddColumn("col3", float64(2.1)) 285 putRowChange.AddColumn("col4", true) 286 putRowChange.AddColumn("col5", int64(50)) 287 putRowChange.AddColumn("col6", int64(60)) 288 putRowChange.AddColumn("col7", []byte("testbytes")) 289 putRowChange.AddColumn("col8", false) 290 putRowChange.SetCondition(RowExistenceExpectation_IGNORE) 291 putRowRequest.PutRowChange = putRowChange 292 _, error := client.PutRow(putRowRequest) 293 c.Check(error, Equals, nil) 294 295 getRowRequest := new(GetRowRequest) 296 criteria := new(SingleRowQueryCriteria) 297 criteria.PrimaryKey = putPk 298 getRowRequest.SingleRowQueryCriteria = criteria 299 getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName 300 getRowRequest.SingleRowQueryCriteria.MaxVersion = 1 301 getResp, error := client.GetRow(getRowRequest) 302 c.Check(error, Equals, nil) 303 c.Check(getResp, NotNil) 304 c.Check(len(getResp.PrimaryKey.PrimaryKeys), Equals, 1) 305 c.Check(getResp.PrimaryKey.PrimaryKeys[0].ColumnName, Equals, "pk1") 306 c.Check(getResp.PrimaryKey.PrimaryKeys[0].Value, Equals, "Key6") 307 c.Check(len(getResp.Columns), Equals, 8) 308 c.Check(getResp.Columns[0].ColumnName, Equals, "col1") 309 c.Check(getResp.Columns[0].Value, Equals, "col1data1") 310 c.Check(getResp.Columns[1].ColumnName, Equals, "col2") 311 c.Check(getResp.Columns[1].Value, Equals, int64(100)) 312 c.Check(getResp.Columns[2].ColumnName, Equals, "col3") 313 c.Check(getResp.Columns[2].Value, Equals, float64(2.1)) 314 c.Check(getResp.Columns[3].ColumnName, Equals, "col4") 315 c.Check(getResp.Columns[3].Value, Equals, true) 316 c.Check(getResp.Columns[4].ColumnName, Equals, "col5") 317 c.Check(getResp.Columns[4].Value, Equals, int64(50)) 318 c.Check(getResp.Columns[5].ColumnName, Equals, "col6") 319 c.Check(getResp.Columns[5].Value, Equals, int64(60)) 320 c.Check(getResp.Columns[6].ColumnName, Equals, "col7") 321 mapData := getResp.GetColumnMap() 322 c.Check(mapData.Columns["col1"][0].Value, Equals, "col1data1") 323 c.Check(mapData.Columns["col2"][0].Value, Equals, int64(100)) 324 c.Check(mapData.Columns["col3"][0].Value, Equals, float64(2.1)) 325 c.Check(mapData.Columns["col4"][0].Value, Equals, true) 326 c.Check(mapData.Columns["col5"][0].Value, Equals, int64(50)) 327 c.Check(mapData.Columns["col6"][0].Value, Equals, int64(60)) 328 329 sortedColumn, error := mapData.GetRange(2, 2) 330 c.Check(error, Equals, nil) 331 c.Check(len(sortedColumn), Equals, 2) 332 c.Check(sortedColumn[0], Equals, mapData.Columns["col3"][0]) 333 c.Check(sortedColumn[1], Equals, mapData.Columns["col4"][0]) 334 335 mapData2 := getResp.GetColumnMap() 336 c.Check(mapData2.Columns["col1"][0].Value, Equals, "col1data1") 337 338 _, error = mapData.GetRange(2, 10) 339 c.Check(error, NotNil) 340 // Test add column to get 341 getRowRequest = new(GetRowRequest) 342 criteria = new(SingleRowQueryCriteria) 343 criteria.PrimaryKey = putPk 344 getRowRequest.SingleRowQueryCriteria = criteria 345 getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName 346 getRowRequest.SingleRowQueryCriteria.MaxVersion = 1 347 getRowRequest.SingleRowQueryCriteria.AddColumnToGet("col1") 348 getRowRequest.SingleRowQueryCriteria.AddColumnToGet("col2") 349 350 getResp, error = client.GetRow(getRowRequest) 351 352 c.Check(error, Equals, nil) 353 c.Check(getResp, NotNil) 354 c.Check(len(getResp.Columns), Equals, 2) 355 356 _, error = invalidClient.GetRow(getRowRequest) 357 c.Check(error, NotNil) 358 359 getRowRequest = new(GetRowRequest) 360 criteria = new(SingleRowQueryCriteria) 361 criteria.PrimaryKey = putPk 362 getRowRequest.SingleRowQueryCriteria = criteria 363 getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName 364 _, error = client.GetRow(getRowRequest) 365 c.Check(error, NotNil) 366 367 notExistPk := new(PrimaryKey) 368 notExistPk.AddPrimaryKeyColumn("pk1", "notexistpk") 369 getRowRequest = new(GetRowRequest) 370 criteria = new(SingleRowQueryCriteria) 371 372 criteria.PrimaryKey = notExistPk 373 getRowRequest.SingleRowQueryCriteria = criteria 374 getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName 375 getRowRequest.SingleRowQueryCriteria.MaxVersion = 1 376 377 getResp, error = client.GetRow(getRowRequest) 378 c.Check(error, IsNil) 379 c.Check(getResp, NotNil) 380 381 colmap := getResp.GetColumnMap() 382 c.Check(colmap, NotNil) 383 384 fmt.Println("TestPutGetRow finished") 385} 386 387func (s *TableStoreSuite) TestCreateTableAndPutRow(c *C) { 388 fmt.Println("TestCreateTableAndPutRow finished") 389 390 tableName := tableNamePrefix + "testpkschema" 391 deleteReq := new(DeleteTableRequest) 392 deleteReq.TableName = tableName 393 client.DeleteTable(deleteReq) 394 395 createtableRequest := new(CreateTableRequest) 396 397 tableMeta := new(TableMeta) 398 tableMeta.TableName = tableName 399 tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING) 400 tableMeta.AddPrimaryKeyColumn("pk2", PrimaryKeyType_INTEGER) 401 tableMeta.AddPrimaryKeyColumn("pk3", PrimaryKeyType_BINARY) 402 403 tableOption := new(TableOption) 404 405 tableOption.TimeToAlive = -1 406 tableOption.MaxVersion = 3 407 408 reservedThroughput := new(ReservedThroughput) 409 reservedThroughput.Readcap = 0 410 reservedThroughput.Writecap = 0 411 412 createtableRequest.TableMeta = tableMeta 413 createtableRequest.TableOption = tableOption 414 createtableRequest.ReservedThroughput = reservedThroughput 415 416 _, error := client.CreateTable(createtableRequest) 417 c.Check(error, Equals, nil) 418 419 putRowRequest := new(PutRowRequest) 420 putRowChange := new(PutRowChange) 421 putRowChange.TableName = tableName 422 putPk := new(PrimaryKey) 423 putPk.AddPrimaryKeyColumn("pk1", "key2") 424 putPk.AddPrimaryKeyColumn("pk2", int64(5)) 425 putPk.AddPrimaryKeyColumn("pk3", []byte("byteskey1")) 426 putRowChange.PrimaryKey = putPk 427 428 timeNow := time.Now().Unix() * 1000 429 putRowChange.AddColumnWithTimestamp("col1", "col1data1", timeNow) 430 putRowChange.AddColumn("col2", int64(100)) 431 putRowChange.AddColumn("col3", float64(2.1)) 432 putRowChange.SetCondition(RowExistenceExpectation_IGNORE) 433 putRowRequest.PutRowChange = putRowChange 434 _, error = client.PutRow(putRowRequest) 435 c.Check(error, Equals, nil) 436 437 fmt.Println("TestCreateTableAndPutRow finished") 438} 439 440func (s *TableStoreSuite) TestPutGetRowWithTimestamp(c *C) { 441 fmt.Println("TestPutGetRowWithTimestamp started") 442 putRowRequest := new(PutRowRequest) 443 putRowChange := new(PutRowChange) 444 putRowChange.TableName = defaultTableName 445 putPk := new(PrimaryKey) 446 putPk.AddPrimaryKeyColumn("pk1", "testtskey1") 447 putRowChange.PrimaryKey = putPk 448 timeNow := time.Now().Unix() * 1000 449 putRowChange.AddColumnWithTimestamp("col1", "col1data1", timeNow) 450 putRowChange.AddColumn("col2", int64(100)) 451 putRowChange.AddColumn("col3", float64(2.1)) 452 putRowChange.AddColumn("col4", true) 453 putRowChange.AddColumn("col5", int64(50)) 454 putRowChange.AddColumn("col6", int64(60)) 455 putRowChange.SetCondition(RowExistenceExpectation_IGNORE) 456 putRowRequest.PutRowChange = putRowChange 457 _, error := client.PutRow(putRowRequest) 458 c.Check(error, Equals, nil) 459 460 getRowRequest := new(GetRowRequest) 461 criteria := new(SingleRowQueryCriteria) 462 criteria.PrimaryKey = putPk 463 getRowRequest.SingleRowQueryCriteria = criteria 464 getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName 465 getRowRequest.SingleRowQueryCriteria.MaxVersion = 1 466 // getRowRequest.SingleRowQueryCriteria.TimeRange = &TimeRange{Specific: timeNow} 467 getResp, error := client.GetRow(getRowRequest) 468 c.Check(error, Equals, nil) 469 c.Check(getResp, NotNil) 470 c.Check(len(getResp.PrimaryKey.PrimaryKeys), Equals, 1) 471 c.Check(getResp.PrimaryKey.PrimaryKeys[0].ColumnName, Equals, "pk1") 472 c.Check(getResp.PrimaryKey.PrimaryKeys[0].Value, Equals, "testtskey1") 473 c.Check(len(getResp.Columns), Equals, 6) 474 c.Check(getResp.Columns[0].ColumnName, Equals, "col1") 475 c.Check(getResp.Columns[0].Value, Equals, "col1data1") 476 c.Check(getResp.Columns[0].Timestamp, Equals, timeNow) 477 c.Check(getResp.Columns[1].ColumnName, Equals, "col2") 478 c.Check(getResp.Columns[1].Value, Equals, int64(100)) 479 c.Check(getResp.Columns[2].ColumnName, Equals, "col3") 480 c.Check(getResp.Columns[2].Value, Equals, float64(2.1)) 481 c.Check(getResp.Columns[3].ColumnName, Equals, "col4") 482 c.Check(getResp.Columns[3].Value, Equals, true) 483 c.Check(getResp.Columns[4].ColumnName, Equals, "col5") 484 c.Check(getResp.Columns[4].Value, Equals, int64(50)) 485 c.Check(getResp.Columns[5].ColumnName, Equals, "col6") 486 c.Check(getResp.Columns[5].Value, Equals, int64(60)) 487 488 getRowRequest.SingleRowQueryCriteria.MaxVersion = 0 489 fmt.Println("timerange", timeNow) 490 getRowRequest.SingleRowQueryCriteria.AddColumnToGet("col1") 491 getRowRequest.SingleRowQueryCriteria.TimeRange = &TimeRange{Specific: timeNow - 1} 492 getResp2, error := client.GetRow(getRowRequest) 493 c.Check(error, Equals, nil) 494 c.Check(getResp2, NotNil) 495 c.Check(len(getResp2.PrimaryKey.PrimaryKeys), Equals, 0) 496 497 getRowRequest.SingleRowQueryCriteria.MaxVersion = 0 498 fmt.Println("timerange", timeNow) 499 getRowRequest.SingleRowQueryCriteria.AddColumnToGet("col1") 500 getRowRequest.SingleRowQueryCriteria.TimeRange = &TimeRange{Start: timeNow + 1, End: timeNow + 2} 501 getResp2, error = client.GetRow(getRowRequest) 502 c.Check(error, Equals, nil) 503 c.Check(getResp2, NotNil) 504 505 getRowRequest.SingleRowQueryCriteria.MaxVersion = 0 506 fmt.Println("timerange", timeNow) 507 getRowRequest.SingleRowQueryCriteria.AddColumnToGet("col1") 508 getRowRequest.SingleRowQueryCriteria.TimeRange = &TimeRange{Specific: timeNow - 1} 509 getResp2, error = client.GetRow(getRowRequest) 510 c.Check(error, Equals, nil) 511 c.Check(getResp2, NotNil) 512 c.Check(len(getResp2.PrimaryKey.PrimaryKeys), Equals, 0) 513 514 fmt.Println("timerange", timeNow) 515 getRowRequest.SingleRowQueryCriteria.AddColumnToGet("col1") 516 getRowRequest.SingleRowQueryCriteria.TimeRange = &TimeRange{Start: timeNow - 1, End: timeNow + 2} 517 getResp2, error = client.GetRow(getRowRequest) 518 c.Check(error, Equals, nil) 519 c.Check(getResp2, NotNil) 520 c.Check(len(getResp2.PrimaryKey.PrimaryKeys), Equals, 1) 521 522 fmt.Println("TestPutGetRowWithTimestamp finished") 523} 524 525func (s *TableStoreSuite) TestPutGetRowWithFilter(c *C) { 526 fmt.Println("TestPutGetRowWithFilter started") 527 putRowRequest := new(PutRowRequest) 528 putRowChange := new(PutRowChange) 529 putRowChange.TableName = defaultTableName 530 putPk := new(PrimaryKey) 531 putPk.AddPrimaryKeyColumn("pk1", "Key6") 532 putRowChange.PrimaryKey = putPk 533 putRowChange.AddColumn("col1", "col1data1") 534 putRowChange.AddColumn("col2", int64(100)) 535 putRowChange.AddColumn("col3", float64(5.1)) 536 putRowChange.AddColumn("col4", true) 537 putRowChange.AddColumn("col5", int64(50)) 538 putRowChange.AddColumn("col6", int64(60)) 539 putRowChange.AddColumn("col7", []byte("testbytes")) 540 putRowChange.AddColumn("col8", false) 541 putRowChange.SetCondition(RowExistenceExpectation_IGNORE) 542 clCondition1 := NewSingleColumnCondition("col2", CT_GREATER_EQUAL, int64(100)) 543 clCondition2 := NewSingleColumnCondition("col5", CT_NOT_EQUAL, int64(20)) 544 clCondition3 := NewSingleColumnCondition("col6", CT_LESS_THAN, int64(100)) 545 clCondition4 := NewSingleColumnCondition("col4", CT_EQUAL, true) 546 clCondition5 := NewSingleColumnCondition("col1", CT_EQUAL, "col1data1") 547 clCondition6 := NewSingleColumnCondition("col3", CT_LESS_EQUAL, float64(5.1)) 548 clCondition7 := NewSingleColumnCondition("col7", CT_EQUAL, []byte("testbytes")) 549 clCondition8 := NewSingleColumnCondition("col5", CT_GREATER_THAN, int64(20)) 550 551 cf := NewCompositeColumnCondition(LO_AND) 552 cf.AddFilter(clCondition1) 553 cf.AddFilter(clCondition2) 554 cf.AddFilter(clCondition3) 555 cf.AddFilter(clCondition4) 556 cf.AddFilter(clCondition5) 557 cf.AddFilter(clCondition6) 558 cf.AddFilter(clCondition7) 559 cf.AddFilter(clCondition8) 560 putRowChange.SetColumnCondition(cf) 561 562 putRowRequest.PutRowChange = putRowChange 563 _, error := client.PutRow(putRowRequest) 564 c.Check(error, Equals, nil) 565 566 cf2 := NewCompositeColumnCondition(LO_OR) 567 cf2.AddFilter(clCondition7) 568 cf2.AddFilter(clCondition8) 569 cf3 := NewCompositeColumnCondition(LO_NOT) 570 clCondition9 := NewSingleColumnCondition("col5", CT_GREATER_THAN, int64(200)) 571 cf3.AddFilter(clCondition9) 572 cf2.AddFilter(cf3) 573 574 getRowRequest := new(GetRowRequest) 575 criteria := new(SingleRowQueryCriteria) 576 criteria.PrimaryKey = putPk 577 getRowRequest.SingleRowQueryCriteria = criteria 578 getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName 579 getRowRequest.SingleRowQueryCriteria.MaxVersion = 1 580 getRowRequest.SingleRowQueryCriteria.SetFilter(cf2) 581 getResp, error := client.GetRow(getRowRequest) 582 c.Check(error, Equals, nil) 583 c.Check(getResp, NotNil) 584 c.Check(len(getResp.PrimaryKey.PrimaryKeys), Equals, 1) 585 c.Check(getResp.PrimaryKey.PrimaryKeys[0].ColumnName, Equals, "pk1") 586 c.Check(getResp.PrimaryKey.PrimaryKeys[0].Value, Equals, "Key6") 587 c.Check(len(getResp.Columns), Equals, 8) 588 c.Check(getResp.Columns[0].ColumnName, Equals, "col1") 589 c.Check(getResp.Columns[0].Value, Equals, "col1data1") 590 c.Check(getResp.Columns[1].ColumnName, Equals, "col2") 591 c.Check(getResp.Columns[1].Value, Equals, int64(100)) 592 c.Check(getResp.Columns[2].ColumnName, Equals, "col3") 593 c.Check(getResp.Columns[2].Value, Equals, float64(5.1)) 594 c.Check(getResp.Columns[3].ColumnName, Equals, "col4") 595 c.Check(getResp.Columns[3].Value, Equals, true) 596 c.Check(getResp.Columns[4].ColumnName, Equals, "col5") 597 c.Check(getResp.Columns[4].Value, Equals, int64(50)) 598 c.Check(getResp.Columns[5].ColumnName, Equals, "col6") 599 c.Check(getResp.Columns[5].Value, Equals, int64(60)) 600 601 getRowRequest = new(GetRowRequest) 602 criteria = new(SingleRowQueryCriteria) 603 criteria.PrimaryKey = putPk 604 getRowRequest.SingleRowQueryCriteria = criteria 605 getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName 606 getRowRequest.SingleRowQueryCriteria.MaxVersion = 1 607 608 pagedFilter := &PaginationFilter{} 609 pagedFilter.Limit = 3 610 pagedFilter.Offset = 1 611 getRowRequest.SingleRowQueryCriteria.SetFilter(pagedFilter) 612 getResp, error = client.GetRow(getRowRequest) 613 c.Check(error, Equals, nil) 614 c.Check(getResp, NotNil) 615 c.Check(len(getResp.Columns), Equals, 3) 616 617 getRowRequest = new(GetRowRequest) 618 criteria = new(SingleRowQueryCriteria) 619 criteria.PrimaryKey = putPk 620 getRowRequest.SingleRowQueryCriteria = criteria 621 getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName 622 getRowRequest.SingleRowQueryCriteria.MaxVersion = 1 623 624 getRowRequest.SingleRowQueryCriteria.SetStartColumn("col3") 625 pagedFilter = &PaginationFilter{} 626 pagedFilter.Limit = 3 627 pagedFilter.Offset = 1 628 getRowRequest.SingleRowQueryCriteria.SetFilter(pagedFilter) 629 getResp, error = client.GetRow(getRowRequest) 630 c.Check(error, Equals, nil) 631 c.Check(getResp, NotNil) 632 c.Check(getResp.Columns[0].ColumnName, Equals, "col4") 633 fmt.Println("TestPutGetRowWithFilter finished") 634} 635 636func (s *TableStoreSuite) TestPutUpdateDeleteRow(c *C) { 637 fmt.Println("TestPutUpdateDeleteRow started") 638 keyToUpdate := "pk1toupdate" 639 putRowRequest := new(PutRowRequest) 640 putRowChange := new(PutRowChange) 641 putRowChange.TableName = defaultTableName 642 putPk := new(PrimaryKey) 643 putPk.AddPrimaryKeyColumn("pk1", keyToUpdate) 644 putRowChange.PrimaryKey = putPk 645 putRowChange.AddColumn("col1", "col1data1") 646 timeNow := int64(time.Now().Unix() * 1000) 647 putRowChange.AddColumnWithTimestamp("col10", "col10data10", timeNow) 648 putRowChange.SetCondition(RowExistenceExpectation_IGNORE) 649 putRowRequest.PutRowChange = putRowChange 650 _, error := client.PutRow(putRowRequest) 651 c.Check(error, Equals, nil) 652 653 updateRowRequest := new(UpdateRowRequest) 654 updateRowChange := new(UpdateRowChange) 655 updateRowChange.TableName = defaultTableName 656 updatePk := new(PrimaryKey) 657 updatePk.AddPrimaryKeyColumn("pk1", keyToUpdate) 658 updateRowChange.PrimaryKey = updatePk 659 updateRowChange.DeleteColumn("col1") 660 updateRowChange.DeleteColumnWithTimestamp("col10", timeNow) 661 updateRowChange.PutColumn("col2", int64(77)) 662 updateRowChange.PutColumn("col3", "newcol3") 663 updateRowChange.SetCondition(RowExistenceExpectation_EXPECT_EXIST) 664 updateRowRequest.UpdateRowChange = updateRowChange 665 _, error = client.UpdateRow(updateRowRequest) 666 c.Check(error, Equals, nil) 667 668 getRowRequest := new(GetRowRequest) 669 criteria := new(SingleRowQueryCriteria) 670 criteria.PrimaryKey = putPk 671 getRowRequest.SingleRowQueryCriteria = criteria 672 getRowRequest.SingleRowQueryCriteria.TableName = defaultTableName 673 getRowRequest.SingleRowQueryCriteria.MaxVersion = 1 674 getResp, error := client.GetRow(getRowRequest) 675 c.Check(error, Equals, nil) 676 c.Check(getResp, NotNil) 677 678 c.Check(len(getResp.PrimaryKey.PrimaryKeys), Equals, 1) 679 c.Check(getResp.PrimaryKey.PrimaryKeys[0].ColumnName, Equals, "pk1") 680 c.Check(getResp.PrimaryKey.PrimaryKeys[0].Value, Equals, keyToUpdate) 681 c.Check(len(getResp.Columns), Equals, 2) 682 c.Check(getResp.Columns[0].ColumnName, Equals, "col2") 683 c.Check(getResp.Columns[0].Value, Equals, int64(77)) 684 c.Check(getResp.Columns[1].ColumnName, Equals, "col3") 685 c.Check(getResp.Columns[1].Value, Equals, "newcol3") 686 687 deleteRowReq := new(DeleteRowRequest) 688 deleteRowReq.DeleteRowChange = new(DeleteRowChange) 689 deleteRowReq.DeleteRowChange.TableName = defaultTableName 690 deletePk := new(PrimaryKey) 691 deletePk.AddPrimaryKeyColumn("pk1", keyToUpdate) 692 deleteRowReq.DeleteRowChange.PrimaryKey = deletePk 693 deleteRowReq.DeleteRowChange.SetCondition(RowExistenceExpectation_EXPECT_EXIST) 694 clCondition1 := NewSingleColumnCondition("col2", CT_EQUAL, int64(77)) 695 deleteRowReq.DeleteRowChange.SetColumnCondition(clCondition1) 696 resp, error := client.DeleteRow(deleteRowReq) 697 c.Check(error, Equals, nil) 698 fmt.Println(resp.ConsumedCapacityUnit.Write) 699 fmt.Println(resp.ConsumedCapacityUnit.Read) 700 701 _, error = invalidClient.UpdateRow(updateRowRequest) 702 c.Check(error, NotNil) 703 704 _, error = invalidClient.DeleteRow(deleteRowReq) 705 c.Check(error, NotNil) 706 707 fmt.Println("TestPutUpdateDeleteRow finished") 708} 709 710func (s *TableStoreSuite) TestBatchGetRow(c *C) { 711 fmt.Println("TestBatchGetRow started") 712 rowCount := 100 713 for i := 0; i < rowCount; i++ { 714 key := "batchkey" + strconv.Itoa(i) 715 value := "value" + strconv.Itoa(i) 716 PrepareDataInDefaultTable(key, value) 717 } 718 719 batchGetReq := &BatchGetRowRequest{} 720 mqCriteria := &MultiRowQueryCriteria{} 721 722 for i := 0; i < rowCount; i++ { 723 pkToGet := new(PrimaryKey) 724 key := "batchkey" + strconv.Itoa(i) 725 pkToGet.AddPrimaryKeyColumn("pk1", key) 726 mqCriteria.AddRow(pkToGet) 727 } 728 mqCriteria.MaxVersion = 1 729 mqCriteria.TableName = defaultTableName 730 batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria) 731 batchGetResponse, error := client.BatchGetRow(batchGetReq) 732 c.Check(error, Equals, nil) 733 734 c.Check(len(batchGetResponse.TableToRowsResult), Equals, 1) 735 c.Check(len(batchGetResponse.TableToRowsResult[mqCriteria.TableName]), Equals, rowCount) 736 737 for index, rowToCheck := range batchGetResponse.TableToRowsResult[mqCriteria.TableName] { 738 c.Check(rowToCheck.Index, Equals, int32(index)) 739 c.Check(rowToCheck.TableName, Equals, mqCriteria.TableName) 740 c.Check(rowToCheck.IsSucceed, Equals, true) 741 c.Check(len(rowToCheck.PrimaryKey.PrimaryKeys), Equals, 1) 742 c.Check(len(rowToCheck.Columns), Equals, 1) 743 } 744 745 batchGetReq = &BatchGetRowRequest{} 746 mqCriteria = &MultiRowQueryCriteria{} 747 748 for i := 0; i < rowCount; i++ { 749 pkToGet := new(PrimaryKey) 750 key := "batchkey" + strconv.Itoa(i) 751 pkToGet.AddPrimaryKeyColumn("pk1", key) 752 mqCriteria.AddRow(pkToGet) 753 mqCriteria.AddColumnToGet("col1") 754 } 755 timeNow := time.Now().Unix() * 1000 756 mqCriteria.TimeRange = &TimeRange{Start: timeNow - 10000, End: timeNow + 10000} 757 mqCriteria.TableName = defaultTableName 758 batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria) 759 batchGetResponse, error = client.BatchGetRow(batchGetReq) 760 c.Check(error, Equals, nil) 761 c.Check(len(batchGetResponse.TableToRowsResult), Equals, 1) 762 c.Check(len(batchGetResponse.TableToRowsResult[mqCriteria.TableName]), Equals, rowCount) 763 764 for index, rowToCheck := range batchGetResponse.TableToRowsResult[mqCriteria.TableName] { 765 c.Check(rowToCheck.TableName, Equals, mqCriteria.TableName) 766 c.Check(rowToCheck.IsSucceed, Equals, true) 767 c.Check(len(rowToCheck.PrimaryKey.PrimaryKeys), Equals, 1) 768 c.Check(len(rowToCheck.Columns), Equals, 1) 769 c.Check(rowToCheck.Index, Equals, int32(index)) 770 } 771 772 // test timerange 773 batchGetReq = &BatchGetRowRequest{} 774 mqCriteria = &MultiRowQueryCriteria{} 775 776 for i := 0; i < rowCount; i++ { 777 pkToGet := new(PrimaryKey) 778 key := "batchkey" + strconv.Itoa(i) 779 pkToGet.AddPrimaryKeyColumn("pk1", key) 780 mqCriteria.AddRow(pkToGet) 781 } 782 mqCriteria.TimeRange = &TimeRange{Start: timeNow + 10000, End: timeNow + 20000} 783 mqCriteria.TableName = defaultTableName 784 batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria) 785 batchGetResponse, error = client.BatchGetRow(batchGetReq) 786 c.Check(error, Equals, nil) 787 788 c.Check(len(batchGetResponse.TableToRowsResult), Equals, 1) 789 c.Check(len(batchGetResponse.TableToRowsResult[mqCriteria.TableName]), Equals, rowCount) 790 791 for index, rowToCheck := range batchGetResponse.TableToRowsResult[mqCriteria.TableName] { 792 c.Check(rowToCheck.TableName, Equals, mqCriteria.TableName) 793 c.Check(rowToCheck.IsSucceed, Equals, true) 794 c.Check(len(rowToCheck.PrimaryKey.PrimaryKeys), Equals, 1) 795 c.Check(len(rowToCheck.Columns), Equals, 0) 796 c.Check(rowToCheck.Index, Equals, int32(index)) 797 } 798 _, error = invalidClient.BatchGetRow(batchGetReq) 799 c.Check(error, NotNil) 800 801 fmt.Println("TestBatchGetRow started") 802} 803 804func (s *TableStoreSuite) TestBatchGetRowWithFilter(c *C) { 805 fmt.Println("TestBatchGetRowWithFilter started") 806 rowCount := 100 807 for i := 0; i < rowCount; i++ { 808 key := "filterbatchkey" + strconv.Itoa(i) 809 value1 := "col0value" + strconv.Itoa(i) 810 value2 := "col1value" + strconv.Itoa(i) 811 value3 := "col2value" + strconv.Itoa(i) 812 PrepareDataInDefaultTableWithMultiAttribute(key, value1, value2, value3) 813 } 814 815 // pagination filter 816 pagedFilter := &PaginationFilter{} 817 pagedFilter.Limit = 2 818 pagedFilter.Offset = 1 819 820 batchGetReq := &BatchGetRowRequest{} 821 mqCriteria := &MultiRowQueryCriteria{} 822 mqCriteria.SetFilter(pagedFilter) 823 824 for i := 0; i < rowCount; i++ { 825 pkToGet := new(PrimaryKey) 826 key := "filterbatchkey" + strconv.Itoa(i) 827 pkToGet.AddPrimaryKeyColumn("pk1", key) 828 mqCriteria.AddRow(pkToGet) 829 } 830 831 mqCriteria.MaxVersion = 1 832 mqCriteria.TableName = defaultTableName 833 batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria) 834 batchGetResponse, error := client.BatchGetRow(batchGetReq) 835 c.Check(error, Equals, nil) 836 837 c.Check(len(batchGetResponse.TableToRowsResult), Equals, 1) 838 c.Check(len(batchGetResponse.TableToRowsResult[mqCriteria.TableName]), Equals, rowCount) 839 840 for index, rowToCheck := range batchGetResponse.TableToRowsResult[mqCriteria.TableName] { 841 c.Check(rowToCheck.TableName, Equals, mqCriteria.TableName) 842 c.Check(rowToCheck.IsSucceed, Equals, true) 843 c.Check(len(rowToCheck.PrimaryKey.PrimaryKeys), Equals, 1) 844 c.Check(len(rowToCheck.Columns), Equals, 2) 845 c.Check(rowToCheck.Index, Equals, int32(index)) 846 } 847 848 // compsite filter 849 batchGetReq = &BatchGetRowRequest{} 850 clCondition1 := NewSingleColumnCondition("col1", CT_EQUAL, "col0value1") 851 clCondition2 := NewSingleColumnCondition("col2", CT_EQUAL, "col1value1") 852 853 cf := NewCompositeColumnCondition(LO_AND) 854 cf.AddFilter(clCondition1) 855 cf.AddFilter(clCondition2) 856 857 mqCriteria = &MultiRowQueryCriteria{} 858 mqCriteria.SetFilter(cf) 859 860 for i := 0; i < rowCount; i++ { 861 pkToGet := new(PrimaryKey) 862 key := "filterbatchkey" + strconv.Itoa(i) 863 pkToGet.AddPrimaryKeyColumn("pk1", key) 864 mqCriteria.AddRow(pkToGet) 865 } 866 867 mqCriteria.MaxVersion = 1 868 mqCriteria.TableName = defaultTableName 869 batchGetReq.MultiRowQueryCriteria = append(batchGetReq.MultiRowQueryCriteria, mqCriteria) 870 batchGetResponse, error = client.BatchGetRow(batchGetReq) 871 c.Check(error, Equals, nil) 872 873 c.Check(len(batchGetResponse.TableToRowsResult), Equals, 1) 874 c.Check(len(batchGetResponse.TableToRowsResult[mqCriteria.TableName]), Equals, rowCount) 875 876 count := 0 877 for index, rowToCheck := range batchGetResponse.TableToRowsResult[mqCriteria.TableName] { 878 c.Check(rowToCheck.Index, Equals, int32(index)) 879 c.Check(rowToCheck.TableName, Equals, mqCriteria.TableName) 880 c.Check(rowToCheck.IsSucceed, Equals, true) 881 882 if len(rowToCheck.PrimaryKey.PrimaryKeys) > 0 { 883 c.Check(len(rowToCheck.Columns), Equals, 3) 884 count++ 885 } 886 } 887 c.Check(count, Equals, 1) 888 889 fmt.Println("TestBatchGetRowWithFilter finished") 890} 891 892func (s *TableStoreSuite) TestBatchWriteRow(c *C) { 893 fmt.Println("TestBatchWriteRow started") 894 895 PrepareDataInDefaultTable("updateinbatchkey1", "updateinput1") 896 PrepareDataInDefaultTable("deleteinbatchkey1", "deleteinput1") 897 batchWriteReq := &BatchWriteRowRequest{} 898 899 rowToPut1 := CreatePutRowChange("putinbatchkey1", "datainput1") 900 rowToPut2 := CreatePutRowChange("putinbatchkey2", "datainput2") 901 902 updateRowChange := new(UpdateRowChange) 903 updateRowChange.TableName = defaultTableName 904 updatePk := new(PrimaryKey) 905 updatePk.AddPrimaryKeyColumn("pk1", "updateinbatchkey1") 906 updateRowChange.PrimaryKey = updatePk 907 updateRowChange.DeleteColumn("col1") 908 updateRowChange.PutColumn("col2", int64(77)) 909 updateRowChange.PutColumn("col3", "newcol3") 910 updateRowChange.SetCondition(RowExistenceExpectation_EXPECT_EXIST) 911 912 deleteRowChange := new(DeleteRowChange) 913 deleteRowChange.TableName = defaultTableName 914 deletePk := new(PrimaryKey) 915 deletePk.AddPrimaryKeyColumn("pk1", "deleteinbatchkey1") 916 deleteRowChange.PrimaryKey = deletePk 917 deleteRowChange.SetCondition(RowExistenceExpectation_EXPECT_EXIST) 918 919 batchWriteReq.AddRowChange(rowToPut1) 920 batchWriteReq.AddRowChange(rowToPut2) 921 batchWriteReq.AddRowChange(updateRowChange) 922 batchWriteReq.AddRowChange(deleteRowChange) 923 924 batchWriteResponse, error := client.BatchWriteRow(batchWriteReq) 925 c.Check(error, Equals, nil) 926 c.Check(len(batchWriteResponse.TableToRowsResult), Equals, 1) 927 928 for index, rowToCheck := range batchWriteResponse.TableToRowsResult[defaultTableName] { 929 c.Check(rowToCheck.Index, Equals, int32(index)) 930 c.Check(rowToCheck.TableName, Equals, defaultTableName) 931 c.Check(rowToCheck.IsSucceed, Equals, true) 932 } 933 934 _, error = invalidClient.BatchWriteRow(batchWriteReq) 935 c.Check(error, NotNil) 936 937 fmt.Println("TestBatchWriteRow finished") 938} 939 940func (s *TableStoreSuite) TestGetRange(c *C) { 941 fmt.Println("TestGetRange started") 942 rowCount := 9 943 timeNow := time.Now().Unix() * 1000 944 for i := 0; i < rowCount; i++ { 945 key := "getrange" + strconv.Itoa(i) 946 value := "value" + strconv.Itoa(i) 947 PrepareDataInDefaultTableWithTimestamp(key, value, timeNow) 948 } 949 950 getRangeRequest := &GetRangeRequest{} 951 rangeRowQueryCriteria := &RangeRowQueryCriteria{} 952 rangeRowQueryCriteria.TableName = defaultTableName 953 start := 1 954 end := 8 955 startPK := new(PrimaryKey) 956 startPK.AddPrimaryKeyColumn("pk1", "getrange"+strconv.Itoa(start)) 957 endPK := new(PrimaryKey) 958 endPK.AddPrimaryKeyColumn("pk1", "getrange"+strconv.Itoa(end)) 959 rangeRowQueryCriteria.StartPrimaryKey = startPK 960 rangeRowQueryCriteria.EndPrimaryKey = endPK 961 rangeRowQueryCriteria.Direction = FORWARD 962 rangeRowQueryCriteria.MaxVersion = 1 963 rangeRowQueryCriteria.ColumnsToGet = []string{"col1"} 964 getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria 965 966 fmt.Println("check", rangeRowQueryCriteria.ColumnsToGet) 967 fmt.Println("check2", getRangeRequest.RangeRowQueryCriteria.ColumnsToGet) 968 getRangeResp, error := client.GetRange(getRangeRequest) 969 c.Check(error, Equals, nil) 970 c.Check(getRangeResp.Rows, NotNil) 971 count := end - start 972 c.Check(len(getRangeResp.Rows), Equals, count) 973 c.Check(len(getRangeResp.Rows[0].Columns), Equals, 1) 974 c.Check(getRangeResp.NextStartPrimaryKey, IsNil) 975 976 getRangeRequest = &GetRangeRequest{} 977 rangeRowQueryCriteria = &RangeRowQueryCriteria{} 978 rangeRowQueryCriteria.TableName = defaultTableName 979 980 rangeRowQueryCriteria.StartPrimaryKey = endPK 981 rangeRowQueryCriteria.EndPrimaryKey = startPK 982 rangeRowQueryCriteria.Direction = BACKWARD 983 rangeRowQueryCriteria.MaxVersion = 1 984 getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria 985 getRangeResp, error = client.GetRange(getRangeRequest) 986 c.Check(error, Equals, nil) 987 c.Check(getRangeResp.Rows, NotNil) 988 989 fmt.Println("use time range to query rows") 990 991 rangeRowQueryCriteria.TimeRange = &TimeRange{Specific: timeNow - 100001} 992 getRangeResp, error = client.GetRange(getRangeRequest) 993 c.Check(error, NotNil) 994 fmt.Println(error) 995 996 fmt.Println("use time range to query rows 2") 997 rangeRowQueryCriteria.TimeRange = &TimeRange{Start: timeNow + 1, End: timeNow + 2} 998 getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria 999 getRangeResp2, error := client.GetRange(getRangeRequest) 1000 1001 c.Check(error, Equals, nil) 1002 c.Check(getRangeResp2.Rows, NotNil) 1003 c.Check(len(getRangeResp2.Rows), Equals, count) 1004 c.Check(len(getRangeResp2.Rows[0].Columns), Equals, 0) 1005 1006 _, error = invalidClient.GetRange(getRangeRequest) 1007 c.Check(error, NotNil) 1008 fmt.Println("TestGetRange finished") 1009} 1010 1011func (s *TableStoreSuite) TestGetRangeWithPagination(c *C) { 1012 fmt.Println("TestGetRangeWithPagination started") 1013 rowCount := 9 1014 for i := 0; i < rowCount; i++ { 1015 key := "testrangequery" + strconv.Itoa(i) 1016 value := "value" + strconv.Itoa(i) 1017 PrepareDataInDefaultTable(key, value) 1018 } 1019 1020 getRangeRequest := &GetRangeRequest{} 1021 rangeRowQueryCriteria := &RangeRowQueryCriteria{} 1022 rangeRowQueryCriteria.TableName = defaultTableName 1023 start := 1 1024 end := 8 1025 var limit int32 = 3 1026 startPK := new(PrimaryKey) 1027 startPK.AddPrimaryKeyColumn("pk1", "testrangequery"+strconv.Itoa(start)) 1028 endPK := new(PrimaryKey) 1029 endPK.AddPrimaryKeyColumn("pk1", "testrangequery"+strconv.Itoa(end)) 1030 rangeRowQueryCriteria.StartPrimaryKey = startPK 1031 rangeRowQueryCriteria.EndPrimaryKey = endPK 1032 rangeRowQueryCriteria.Direction = FORWARD 1033 rangeRowQueryCriteria.MaxVersion = 1 1034 rangeRowQueryCriteria.Limit = limit 1035 getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria 1036 1037 getRangeResp, error := client.GetRange(getRangeRequest) 1038 1039 c.Check(error, Equals, nil) 1040 c.Check(getRangeResp.Rows, NotNil) 1041 1042 c.Check(len(getRangeResp.Rows), Equals, int(limit)) 1043 c.Check(getRangeResp.NextStartPrimaryKey, NotNil) 1044 fmt.Println("TestGetRangeWithPagination finished") 1045} 1046 1047func (s *TableStoreSuite) TestGetRangeWithFilter(c *C) { 1048 fmt.Println("TestGetRange started") 1049 rowCount := 20 1050 timeNow := time.Now().Unix() * 1000 1051 for i := 0; i < rowCount; i++ { 1052 key := "zgetrangetest" + strconv.Itoa(i) 1053 value := "value" + strconv.Itoa(i) 1054 PrepareDataInRangeTableWithTimestamp("pk1", key, value, timeNow) 1055 } 1056 1057 for i := 0; i < rowCount; i++ { 1058 key := "zgetrangetest2" + strconv.Itoa(i) 1059 value := "value" + strconv.Itoa(i) 1060 PrepareDataInRangeTableWithTimestamp("pk2", key, value, timeNow) 1061 } 1062 1063 for i := 0; i < rowCount; i++ { 1064 key := "zgetrangetest3" + strconv.Itoa(i) 1065 value := "value" + strconv.Itoa(i) 1066 PrepareDataInRangeTableWithTimestamp("pk3", key, value, timeNow) 1067 } 1068 1069 getRangeRequest := &GetRangeRequest{} 1070 rangeRowQueryCriteria := &RangeRowQueryCriteria{} 1071 rangeRowQueryCriteria.TableName = rangeQueryTableName 1072 1073 startPK := new(PrimaryKey) 1074 startPK.AddPrimaryKeyColumnWithMinValue("pk1") 1075 startPK.AddPrimaryKeyColumnWithMinValue("pk2") 1076 endPK := new(PrimaryKey) 1077 endPK.AddPrimaryKeyColumnWithMaxValue("pk1") 1078 endPK.AddPrimaryKeyColumnWithMaxValue("pk2") 1079 rangeRowQueryCriteria.StartPrimaryKey = startPK 1080 rangeRowQueryCriteria.EndPrimaryKey = endPK 1081 rangeRowQueryCriteria.Direction = FORWARD 1082 rangeRowQueryCriteria.MaxVersion = 1 1083 filter := NewCompositeColumnCondition(LogicalOperator(LO_AND)) 1084 filter1 := NewSingleColumnCondition("pk2", ComparatorType(CT_GREATER_EQUAL), "pk3") 1085 filter2 := NewSingleColumnCondition("pk2", ComparatorType(CT_LESS_EQUAL), "pk3") 1086 filter.AddFilter(filter2) 1087 filter.AddFilter(filter1) 1088 rangeRowQueryCriteria.Filter = filter 1089 getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria 1090 1091 getRangeResp, error := client.GetRange(getRangeRequest) 1092 c.Check(error, Equals, nil) 1093 fmt.Println(getRangeResp) 1094 fmt.Println(getRangeResp.NextStartPrimaryKey) 1095 fmt.Println(getRangeResp.Rows) 1096 //fmt.Println(getRangeResp.NextStartPrimaryKey) 1097 //c.Check(getRangeResp.Rows, NotNil) 1098 1099 fmt.Println("TestGetRange with filter finished") 1100} 1101 1102func (s *TableStoreSuite) TestGetRangeWithMinMaxValue(c *C) { 1103 fmt.Println("TestGetRangeWithMinMaxValue started") 1104 1105 getRangeRequest := &GetRangeRequest{} 1106 rangeRowQueryCriteria := &RangeRowQueryCriteria{} 1107 rangeRowQueryCriteria.TableName = defaultTableName 1108 1109 var limit int32 = 8 1110 startPK := new(PrimaryKey) 1111 startPK.AddPrimaryKeyColumnWithMinValue("pk1") 1112 endPK := new(PrimaryKey) 1113 endPK.AddPrimaryKeyColumnWithMaxValue("pk1") 1114 rangeRowQueryCriteria.StartPrimaryKey = startPK 1115 rangeRowQueryCriteria.EndPrimaryKey = endPK 1116 rangeRowQueryCriteria.Direction = FORWARD 1117 rangeRowQueryCriteria.MaxVersion = 1 1118 rangeRowQueryCriteria.Limit = limit 1119 getRangeRequest.RangeRowQueryCriteria = rangeRowQueryCriteria 1120 1121 getRangeResp, error := client.GetRange(getRangeRequest) 1122 1123 c.Check(error, Equals, nil) 1124 c.Check(getRangeResp.Rows, NotNil) 1125 1126 c.Check(len(getRangeResp.Rows), Equals, int(limit)) 1127 c.Check(getRangeResp.NextStartPrimaryKey, NotNil) 1128 fmt.Println("TestGetRangeWithMinMaxValue finished") 1129} 1130 1131func (s *TableStoreSuite) TestPutRowsWorkload(c *C) { 1132 fmt.Println("TestPutRowsWorkload started") 1133 1134 start := time.Now().UnixNano() 1135 1136 isFinished := make(chan bool) 1137 totalCount := 100 1138 for i := 0; i < totalCount; i++ { 1139 value := i * 10000 1140 go func(index int) { 1141 for j := 0; j < 100; j++ { 1142 currentIndex := index + j 1143 rowToPut1 := CreatePutRowChange("workloadtestkey"+strconv.Itoa(currentIndex), "perfdata1") 1144 putRowRequest := new(PutRowRequest) 1145 putRowRequest.PutRowChange = rowToPut1 1146 _, error := client.PutRow(putRowRequest) 1147 if error != nil { 1148 fmt.Println("put row error", error) 1149 } 1150 c.Check(error, IsNil) 1151 } 1152 1153 isFinished <- true 1154 }(value) 1155 } 1156 1157 /*go func(){ 1158 time.Sleep(time.Millisecond * 1000 * 10) 1159 close(isFinished) 1160 }()*/ 1161 1162 count := 0 1163 for _ = range isFinished { 1164 count++ 1165 fmt.Println("catched count is:", count) 1166 if count >= totalCount { 1167 close(isFinished) 1168 } 1169 } 1170 c.Check(count, Equals, totalCount) 1171 end := time.Now().UnixNano() 1172 1173 totalCost := (end - start) / 1000000 1174 fmt.Println("total cost:", totalCost) 1175 c.Check(totalCost < 30*1000, Equals, true) 1176 1177 time.Sleep(time.Millisecond * 20) 1178 fmt.Println("TestPutRowsWorkload finished") 1179} 1180 1181func (s *TableStoreSuite) TestFailureCase(c *C) { 1182 tableName := randStringRunes(200) 1183 createtableRequest := new(CreateTableRequest) 1184 tableMeta := new(TableMeta) 1185 tableMeta.TableName = tableName 1186 tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING) 1187 tableOption := new(TableOption) 1188 tableOption.TimeToAlive = -1 1189 tableOption.MaxVersion = 3 1190 reservedThroughput := new(ReservedThroughput) 1191 reservedThroughput.Readcap = 0 1192 reservedThroughput.Writecap = 0 1193 createtableRequest.TableMeta = tableMeta 1194 createtableRequest.TableOption = tableOption 1195 createtableRequest.ReservedThroughput = reservedThroughput 1196 _, error := client.CreateTable(createtableRequest) 1197 c.Check(error, NotNil) 1198 c.Check(error.Error(), Equals, errTableNameTooLong(tableName).Error()) 1199 1200 createtableRequest = new(CreateTableRequest) 1201 tableMeta = new(TableMeta) 1202 tableMeta.TableName = tableNamePrefix + "pktomuch" 1203 1204 tableOption = new(TableOption) 1205 tableOption.TimeToAlive = -1 1206 tableOption.MaxVersion = 3 1207 reservedThroughput = new(ReservedThroughput) 1208 reservedThroughput.Readcap = 0 1209 reservedThroughput.Writecap = 0 1210 createtableRequest.TableMeta = tableMeta 1211 createtableRequest.TableOption = tableOption 1212 createtableRequest.ReservedThroughput = reservedThroughput 1213 _, error = client.CreateTable(createtableRequest) 1214 c.Check(error, NotNil) 1215 c.Check(error.Error(), Equals, errCreateTableNoPrimaryKey.Error()) 1216 1217 tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING) 1218 tableMeta.AddPrimaryKeyColumn("pk2", PrimaryKeyType_STRING) 1219 tableMeta.AddPrimaryKeyColumn("pk3", PrimaryKeyType_STRING) 1220 tableMeta.AddPrimaryKeyColumn("pk4", PrimaryKeyType_STRING) 1221 tableMeta.AddPrimaryKeyColumn("pk5", PrimaryKeyType_STRING) 1222 1223 _, error = client.CreateTable(createtableRequest) 1224 c.Check(error, NotNil) 1225 c.Check(error.Error(), Equals, errPrimaryKeyTooMuch.Error()) 1226 1227 request := &PutRowRequest{} 1228 _, error = client.PutRow(request) 1229 c.Check(error, IsNil) 1230 1231 _, error = client.PutRow(nil) 1232 c.Check(error, IsNil) 1233 1234 _, err := invalidClient.ListTable() 1235 c.Check(err, NotNil) 1236 1237 tableName = tableNamePrefix + "tablenotexist" 1238 deleteReq := new(DeleteTableRequest) 1239 deleteReq.TableName = tableName 1240 _, err = client.DeleteTable(deleteReq) 1241 c.Check(err, NotNil) 1242 1243 _, err = invalidClient.ListTable() 1244 c.Check(err, NotNil) 1245 1246 updateTableReq := new(UpdateTableRequest) 1247 updateTableReq.TableName = defaultTableName 1248 updateTableReq.TableOption = new(TableOption) 1249 updateTableReq.TableOption.TimeToAlive = -1 1250 updateTableReq.TableOption.MaxVersion = 5 1251 updateTableReq.ReservedThroughput = &ReservedThroughput{} 1252 updateTableReq.ReservedThroughput.Readcap = 0 1253 1254 _, error = invalidClient.UpdateTable(updateTableReq) 1255 c.Assert(error, NotNil) 1256 1257 describeTableReq := new(DescribeTableRequest) 1258 describeTableReq.TableName = defaultTableName 1259 _, error = invalidClient.DescribeTable(describeTableReq) 1260 c.Assert(error, NotNil) 1261} 1262 1263func (s *TableStoreSuite) TestMockHttpClientCase(c *C) { 1264 fmt.Println("TestMockHttpClientCase started") 1265 currentGetHttpClientFunc = func() IHttpClient { 1266 return &mockHttpClient{} 1267 } 1268 1269 tempClient := NewClientWithConfig("test", "a", "b", "c", "d", NewDefaultTableStoreConfig()) 1270 putRowRequest := new(PutRowRequest) 1271 putRowChange := new(PutRowChange) 1272 putRowChange.TableName = defaultTableName 1273 putPk := new(PrimaryKey) 1274 putPk.AddPrimaryKeyColumn("pk1", "mockkey1") 1275 putRowChange.PrimaryKey = putPk 1276 putRowChange.AddColumn("col1", "col1data1") 1277 putRowChange.AddColumn("col2", int64(100)) 1278 putRowChange.AddColumn("col3", float64(2.1)) 1279 putRowChange.SetCondition(RowExistenceExpectation_EXPECT_NOT_EXIST) 1280 putRowRequest.PutRowChange = putRowChange 1281 data := tempClient.httpClient.(*mockHttpClient) 1282 1283 data.error = fmt.Errorf("test") 1284 _, error := tempClient.PutRow(putRowRequest) 1285 c.Check(error, Equals, data.error) 1286 1287 data.response = &http.Response{} 1288 _, error = tempClient.PutRow(putRowRequest) 1289 c.Check(error, Equals, data.error) 1290 1291 /*data.error = nil 1292 _, error = tempClient.PutRow(putRowRequest) 1293 c.Check(error, Equals, data.error)*/ 1294 1295 currentGetHttpClientFunc = func() IHttpClient { 1296 return &TableStoreHttpClient{} 1297 } 1298 1299 fmt.Println("TestMockHttpClientCase finished") 1300} 1301 1302func (s *TableStoreSuite) TestUnit(c *C) { 1303 otshead := createOtsHeaders("test") 1304 otshead.set(xOtsApiversion, ApiVersion) 1305 _, error := otshead.signature(getRowUri, "POST", "test") 1306 c.Check(error, NotNil) 1307 1308 otshead.set(xOtsDate, "any") 1309 otshead.set(xOtsApiversion, "any") 1310 otshead.set(xOtsAccesskeyid, "any") 1311 otshead.set(xOtsContentmd5, "any") 1312 otshead.set(xOtsInstanceName, "any") 1313 1314 otshead.headers = nil 1315 otshead.set("abc", "def") 1316 1317 result := otshead.search("zz") 1318 c.Check(result, IsNil) 1319 1320 tempClient := NewClient("a", "b", "c", "d", SetSth()) 1321 c.Check(tempClient, NotNil) 1322 config := NewDefaultTableStoreConfig() 1323 tempClient = NewClientWithConfig("a", "b", "c", "d", "e", config) 1324 c.Check(tempClient, NotNil) 1325 1326 errorCode := INTERNAL_SERVER_ERROR 1327 tsClient := client.(*TableStoreClient) 1328 value := getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 10, time.Now().Add(time.Second*1), 10, getRowUri) 1329 c.Check(value == 0, Equals, true) 1330 1331 errorCode = ROW_OPERATION_CONFLICT 1332 value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, getRowUri) 1333 c.Check(value > 0, Equals, true) 1334 1335 errorCode = STORAGE_TIMEOUT 1336 value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, putRowUri) 1337 c.Check(value == 0, Equals, true) 1338 1339 errorCode = STORAGE_TIMEOUT 1340 value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, getRowUri) 1341 c.Check(value > 0, Equals, true) 1342 1343 errorCode = STORAGE_TIMEOUT 1344 value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), MaxRetryInterval, getRowUri) 1345 c.Check(value == MaxRetryInterval, Equals, true) 1346 1347 // stream api 1348 errorCode = STORAGE_TIMEOUT 1349 value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, getStreamRecordUri) 1350 c.Check(value > 0, Equals, true) 1351 1352 // 502 1353 errorCode = SERVER_UNAVAILABLE 1354 value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: "bad gateway"}, 1, time.Now().Add(time.Second*1), 10, getStreamRecordUri) 1355 c.Check(value > 0, Equals, true) 1356 1357 // 502 write 1358 errorCode = SERVER_UNAVAILABLE 1359 value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: "bad gateway"}, 1, time.Now().Add(time.Second*1), 10, putRowUri) 1360 c.Check(value == 0, Equals, true) 1361 1362 // 400 normal 1363 errorCode = "OTSPermissionDenied" 1364 value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, putRowUri) 1365 c.Check(value == 0, Equals, true) 1366 1367 // 400 raw http 1368 errorCode = OTS_CLIENT_UNKNOWN 1369 value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, getRowUri) 1370 c.Check(value == 0, Equals, true) 1371 1372 // storage 503 put 1373 errorCode = STORAGE_SERVER_BUSY 1374 value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, putRowUri) 1375 c.Check(value > 0, Equals, true) 1376 1377 // storage 503 desc stream 1378 errorCode = STORAGE_SERVER_BUSY 1379 value = getNextPause(tsClient, &OtsError{Code: errorCode, Message: errorCode}, 1, time.Now().Add(time.Second*1), 10, describeStreamUri) 1380 c.Check(value > 0, Equals, true) 1381 1382 // EOF 1383 value = getNextPause(tsClient, io.EOF, 1, time.Now().Add(time.Second*1), 10, putRowUri) 1384 c.Check(value > 0, Equals, true) 1385 1386 // connection rest 1387 value = getNextPause(tsClient, syscall.ECONNRESET, 1, time.Now().Add(time.Second*1), 10, putRowUri) 1388 c.Check(value > 0, Equals, true) 1389 1390 getResp := &GetRowResponse{} 1391 colMap := getResp.GetColumnMap() 1392 c.Check(colMap, NotNil) 1393 1394 getResp = &GetRowResponse{} 1395 col1 := &AttributeColumn{ColumnName: "col1", Value: "value1"} 1396 col2 := &AttributeColumn{ColumnName: "col1", Value: "value2"} 1397 col3 := &AttributeColumn{ColumnName: "col2", Value: "value3"} 1398 1399 getResp.Columns = append(getResp.Columns, col1) 1400 getResp.Columns = append(getResp.Columns, col2) 1401 getResp.Columns = append(getResp.Columns, col3) 1402 colMap = getResp.GetColumnMap() 1403 c.Check(colMap, NotNil) 1404 cols := colMap.Columns["col1"] 1405 c.Check(cols, NotNil) 1406 c.Check(len(cols), Equals, 2) 1407 1408 cols2 := colMap.Columns["col2"] 1409 c.Check(cols2, NotNil) 1410 c.Check(len(cols2), Equals, 1) 1411 1412 cols3, _ := colMap.GetRange(1, 1) 1413 1414 c.Check(cols3, NotNil) 1415 c.Check(len(cols3), Equals, 1) 1416 1417 var resp2 *GetRowResponse 1418 resp2 = nil 1419 c.Check(resp2.GetColumnMap(), IsNil) 1420} 1421 1422func SetSth() ClientOption { 1423 return func(client *TableStoreClient) { 1424 fmt.Println(client.accessKeyId) 1425 } 1426} 1427 1428func CreatePutRowChange(pkValue, colValue string) *PutRowChange { 1429 putRowChange := new(PutRowChange) 1430 putRowChange.TableName = defaultTableName 1431 putPk := new(PrimaryKey) 1432 putPk.AddPrimaryKeyColumn("pk1", pkValue) 1433 putRowChange.PrimaryKey = putPk 1434 putRowChange.AddColumn("col1", colValue) 1435 putRowChange.SetCondition(RowExistenceExpectation_IGNORE) 1436 return putRowChange 1437} 1438 1439type mockHttpClient struct { 1440 response *http.Response 1441 error error 1442 httpClient *http.Client 1443} 1444 1445func (mockHttpClient *mockHttpClient) Do(req *http.Request) (*http.Response, error) { 1446 return mockHttpClient.response, mockHttpClient.error 1447} 1448 1449func (mockHttpClient *mockHttpClient) New(client *http.Client) { 1450 mockHttpClient.httpClient = client 1451} 1452 1453var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") 1454 1455func randStringRunes(n int) string { 1456 random := rand.New(rand.NewSource(time.Now().Unix())) 1457 1458 b := make([]rune, n) 1459 for i := range b { 1460 b[i] = letterRunes[random.Intn(len(letterRunes))] 1461 } 1462 return string(b) 1463} 1464 1465func PrepareDataInDefaultTable(key string, value string) error { 1466 putRowRequest := new(PutRowRequest) 1467 putRowChange := new(PutRowChange) 1468 putRowChange.TableName = defaultTableName 1469 putPk := new(PrimaryKey) 1470 putPk.AddPrimaryKeyColumn("pk1", key) 1471 putRowChange.AddColumn("col1", value) 1472 putRowChange.PrimaryKey = putPk 1473 putRowChange.SetCondition(RowExistenceExpectation_IGNORE) 1474 putRowRequest.PutRowChange = putRowChange 1475 _, error := client.PutRow(putRowRequest) 1476 return error 1477} 1478 1479func PrepareDataInDefaultTableWithMultiAttribute(key string, value1 string, value2 string, value3 string) error { 1480 putRowRequest := new(PutRowRequest) 1481 putRowChange := new(PutRowChange) 1482 putRowChange.TableName = defaultTableName 1483 putPk := new(PrimaryKey) 1484 putPk.AddPrimaryKeyColumn("pk1", key) 1485 putRowChange.AddColumn("col1", value1) 1486 putRowChange.AddColumn("col2", value2) 1487 putRowChange.AddColumn("col3", value3) 1488 putRowChange.PrimaryKey = putPk 1489 putRowChange.SetCondition(RowExistenceExpectation_IGNORE) 1490 putRowRequest.PutRowChange = putRowChange 1491 _, error := client.PutRow(putRowRequest) 1492 return error 1493} 1494 1495func PrepareDataInDefaultTableWithTimestamp(key string, value string, timeNow int64) error { 1496 putRowRequest := new(PutRowRequest) 1497 putRowChange := new(PutRowChange) 1498 putRowChange.TableName = defaultTableName 1499 putPk := new(PrimaryKey) 1500 putPk.AddPrimaryKeyColumn("pk1", key) 1501 putRowChange.PrimaryKey = putPk 1502 putRowChange.AddColumnWithTimestamp("col1", value, timeNow) 1503 putRowChange.AddColumnWithTimestamp("col2", value, timeNow) 1504 putRowChange.AddColumnWithTimestamp("col3", value, timeNow) 1505 putRowChange.SetCondition(RowExistenceExpectation_IGNORE) 1506 putRowRequest.PutRowChange = putRowChange 1507 _, error := client.PutRow(putRowRequest) 1508 return error 1509} 1510 1511func PrepareDataInRangeTableWithTimestamp(key1 string, key2 string, value string, timeNow int64) error { 1512 putRowRequest := new(PutRowRequest) 1513 putRowChange := new(PutRowChange) 1514 putRowChange.TableName = rangeQueryTableName 1515 putPk := new(PrimaryKey) 1516 putPk.AddPrimaryKeyColumn("pk1", key1) 1517 putPk.AddPrimaryKeyColumn("pk2", key2) 1518 putRowChange.PrimaryKey = putPk 1519 putRowChange.AddColumnWithTimestamp("col1", value, timeNow) 1520 putRowChange.SetCondition(RowExistenceExpectation_IGNORE) 1521 putRowRequest.PutRowChange = putRowChange 1522 _, error := client.PutRow(putRowRequest) 1523 return error 1524} 1525 1526func (s *TableStoreSuite) TestListStream(c *C) { 1527 tableName := defaultTableName + "_ListStream" 1528 fmt.Printf("TestListStream starts on table %s\n", tableName) 1529 { 1530 err := PrepareTable(tableName) 1531 c.Assert(err, IsNil) 1532 } 1533 defer client.DeleteTable(&DeleteTableRequest{TableName: tableName}) 1534 { 1535 resp, err := client.DescribeTable(&DescribeTableRequest{TableName: tableName}) 1536 c.Assert(err, IsNil) 1537 c.Assert(resp.StreamDetails, NotNil) 1538 c.Assert(resp.StreamDetails.EnableStream, Equals, false) 1539 c.Assert(resp.StreamDetails.StreamId, IsNil) 1540 c.Assert(resp.StreamDetails.ExpirationTime, Equals, int32(0)) 1541 c.Assert(resp.StreamDetails.LastEnableTime, Equals, int64(0)) 1542 } 1543 { 1544 resp, err := client.ListStream(&ListStreamRequest{TableName: &tableName}) 1545 c.Assert(err, IsNil) 1546 fmt.Printf("%v\n", resp) 1547 c.Assert(len(resp.Streams), Equals, 0) 1548 } 1549 { 1550 resp, err := client.UpdateTable(&UpdateTableRequest{ 1551 TableName: tableName, 1552 StreamSpec: &StreamSpecification{EnableStream: true, ExpirationTime: 24}}) 1553 c.Assert(err, IsNil) 1554 c.Assert(resp.StreamDetails, NotNil) 1555 } 1556 { 1557 resp, err := client.ListStream(&ListStreamRequest{TableName: &tableName}) 1558 c.Assert(err, IsNil) 1559 fmt.Printf("%#v\n", resp) 1560 c.Assert(len(resp.Streams), Equals, 1) 1561 } 1562 { 1563 resp, err := client.DescribeTable(&DescribeTableRequest{TableName: tableName}) 1564 c.Assert(err, IsNil) 1565 c.Assert(resp.StreamDetails, NotNil) 1566 fmt.Printf("%#v\n", resp) 1567 c.Assert(resp.StreamDetails.EnableStream, Equals, true) 1568 c.Assert(resp.StreamDetails.StreamId, NotNil) 1569 c.Assert(resp.StreamDetails.ExpirationTime, Equals, int32(24)) 1570 c.Assert(resp.StreamDetails.LastEnableTime > 0, Equals, true) 1571 } 1572 fmt.Println("TestListStream finish") 1573} 1574 1575func (s *TableStoreSuite) TestCreateTableWithStream(c *C) { 1576 tableName := defaultTableName + "_CreateTableWithStream" 1577 fmt.Printf("TestCreateTableWithStream starts on table %s\n", tableName) 1578 { 1579 req := CreateTableRequest{} 1580 tableMeta := TableMeta{} 1581 tableMeta.TableName = tableName 1582 tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING) 1583 req.TableMeta = &tableMeta 1584 1585 tableOption := TableOption{} 1586 tableOption.TimeToAlive = -1 1587 tableOption.MaxVersion = 3 1588 req.TableOption = &tableOption 1589 1590 req.ReservedThroughput = &ReservedThroughput{Readcap: 0, Writecap: 0} 1591 1592 req.StreamSpec = &StreamSpecification{EnableStream: true, ExpirationTime: 24} 1593 1594 _, err := client.CreateTable(&req) 1595 c.Assert(err, IsNil) 1596 } 1597 defer client.DeleteTable(&DeleteTableRequest{TableName: tableName}) 1598 { 1599 resp, err := client.ListStream(&ListStreamRequest{TableName: &tableName}) 1600 c.Assert(err, IsNil) 1601 fmt.Printf("%#v\n", resp) 1602 c.Assert(len(resp.Streams), Equals, 1) 1603 } 1604 fmt.Println("TestCreateTableWithStream finish") 1605} 1606 1607func (s *TableStoreSuite) TestStream(c *C) { 1608 tableName := defaultTableName + "_Stream" 1609 fmt.Printf("TestCreateTableWithStream starts on table %s\n", tableName) 1610 { 1611 req := CreateTableRequest{} 1612 tableMeta := TableMeta{} 1613 tableMeta.TableName = tableName 1614 tableMeta.AddPrimaryKeyColumn("pk1", PrimaryKeyType_STRING) 1615 req.TableMeta = &tableMeta 1616 1617 tableOption := TableOption{} 1618 tableOption.TimeToAlive = -1 1619 tableOption.MaxVersion = 3 1620 req.TableOption = &tableOption 1621 1622 req.ReservedThroughput = &ReservedThroughput{Readcap: 0, Writecap: 0} 1623 1624 req.StreamSpec = &StreamSpecification{EnableStream: true, ExpirationTime: 24} 1625 1626 _, err := client.CreateTable(&req) 1627 c.Assert(err, IsNil) 1628 } 1629 defer client.DeleteTable(&DeleteTableRequest{TableName: tableName}) 1630 var streamId *StreamId 1631 { 1632 resp, err := client.ListStream(&ListStreamRequest{TableName: &tableName}) 1633 c.Assert(err, IsNil) 1634 fmt.Printf("%#v\n", resp) 1635 c.Assert(len(resp.Streams), Equals, 1) 1636 streamId = resp.Streams[0].Id 1637 } 1638 c.Assert(streamId, NotNil) 1639 var shardId *ShardId 1640 for { 1641 resp, err := client.DescribeStream(&DescribeStreamRequest{StreamId: streamId}) 1642 c.Assert(err, IsNil) 1643 fmt.Printf("DescribeStreamResponse: %#v\n", resp) 1644 c.Assert(*resp.StreamId, Equals, *streamId) 1645 c.Assert(resp.ExpirationTime, Equals, int32(24)) 1646 c.Assert(*resp.TableName, Equals, tableName) 1647 c.Assert(len(resp.Shards), Equals, 1) 1648 fmt.Printf("StreamShard: %#v\n", resp.Shards[0]) 1649 shardId = resp.Shards[0].SelfShard 1650 if resp.Status == SS_Active { 1651 break 1652 } 1653 } 1654 c.Assert(shardId, NotNil) 1655 var iter *ShardIterator 1656 var records []*StreamRecord 1657 { 1658 resp, err := client.GetShardIterator(&GetShardIteratorRequest{ 1659 StreamId: streamId, 1660 ShardId: shardId}) 1661 c.Assert(err, IsNil) 1662 c.Assert(resp.ShardIterator, NotNil) 1663 iter = resp.ShardIterator 1664 } 1665 fmt.Printf("init iterator: %#v\n", *iter) 1666 iter, _ = exhaustStreamRecords(c, iter) 1667 fmt.Printf("put row:\n") 1668 { 1669 req := PutRowRequest{} 1670 rowChange := PutRowChange{} 1671 rowChange.TableName = tableName 1672 pk := PrimaryKey{} 1673 pk.AddPrimaryKeyColumn("pk1", "rowkey") 1674 rowChange.PrimaryKey = &pk 1675 rowChange.AddColumn("colToDel", "abc") 1676 rowChange.AddColumn("colToDelAll", true) 1677 rowChange.AddColumn("colToUpdate", int64(123)) 1678 rowChange.SetCondition(RowExistenceExpectation_IGNORE) 1679 req.PutRowChange = &rowChange 1680 _, err := client.PutRow(&req) 1681 c.Assert(err, IsNil) 1682 } 1683 iter, records = exhaustStreamRecords(c, iter) 1684 var timestamp int64 1685 { 1686 c.Assert(len(records), Equals, 1) 1687 r := records[0] 1688 c.Assert(r.Type, Equals, AT_Put) 1689 c.Assert(r.Info, NotNil) 1690 c.Assert(r.PrimaryKey, NotNil) 1691 1692 pkey := r.PrimaryKey 1693 c.Assert(len(pkey.PrimaryKeys), Equals, 1) 1694 pkc := pkey.PrimaryKeys[0] 1695 c.Assert(pkc, NotNil) 1696 c.Assert(pkc.ColumnName, Equals, "pk1") 1697 c.Assert(pkc.Value, Equals, "rowkey") 1698 c.Assert(pkc.PrimaryKeyOption, Equals, NONE) 1699 1700 c.Assert(len(r.Columns), Equals, 3) 1701 attr0 := r.Columns[0] 1702 attr1 := r.Columns[1] 1703 attr2 := r.Columns[2] 1704 c.Assert(attr0, NotNil) 1705 c.Assert(*attr0.Name, Equals, "colToDel") 1706 c.Assert(attr0.Type, Equals, RCT_Put) 1707 c.Assert(attr0.Value, Equals, "abc") 1708 c.Assert(attr1, NotNil) 1709 c.Assert(*attr1.Name, Equals, "colToDelAll") 1710 c.Assert(attr1.Type, Equals, RCT_Put) 1711 c.Assert(attr1.Value, Equals, true) 1712 timestamp = *attr0.Timestamp 1713 c.Assert(attr2, NotNil) 1714 c.Assert(*attr2.Name, Equals, "colToUpdate") 1715 c.Assert(attr2.Type, Equals, RCT_Put) 1716 c.Assert(attr2.Value, Equals, int64(123)) 1717 } 1718 { 1719 chg := UpdateRowChange{} 1720 chg.TableName = tableName 1721 pk := PrimaryKey{} 1722 pk.AddPrimaryKeyColumn("pk1", "rowkey") 1723 chg.PrimaryKey = &pk 1724 chg.SetCondition(RowExistenceExpectation_IGNORE) 1725 chg.DeleteColumnWithTimestamp("colToDel", timestamp) 1726 chg.DeleteColumn("colToDelAll") 1727 chg.PutColumn("colToUpdate", 3.14) 1728 _, err := client.UpdateRow(&UpdateRowRequest{UpdateRowChange: &chg}) 1729 c.Assert(err, IsNil) 1730 } 1731 iter, records = exhaustStreamRecords(c, iter) 1732 { 1733 c.Assert(len(records), Equals, 1) 1734 r := records[0] 1735 c.Assert(r.Type, Equals, AT_Update) 1736 c.Assert(r.Info, NotNil) 1737 c.Assert(r.PrimaryKey, NotNil) 1738 1739 pkey := r.PrimaryKey 1740 c.Assert(len(pkey.PrimaryKeys), Equals, 1) 1741 pkc := pkey.PrimaryKeys[0] 1742 c.Assert(pkc, NotNil) 1743 c.Assert(pkc.ColumnName, Equals, "pk1") 1744 c.Assert(pkc.Value, Equals, "rowkey") 1745 c.Assert(pkc.PrimaryKeyOption, Equals, NONE) 1746 1747 c.Assert(len(r.Columns), Equals, 3) 1748 attr0 := r.Columns[0] 1749 attr1 := r.Columns[1] 1750 attr2 := r.Columns[2] 1751 c.Assert(attr0, NotNil) 1752 c.Assert(*attr0.Name, Equals, "colToDel") 1753 c.Assert(attr0.Type, Equals, RCT_DeleteOneVersion) 1754 c.Assert(attr0.Value, IsNil) 1755 c.Assert(attr0.Timestamp, NotNil) 1756 c.Assert(*attr0.Timestamp, Equals, timestamp) 1757 c.Assert(attr1, NotNil) 1758 c.Assert(*attr1.Name, Equals, "colToDelAll") 1759 c.Assert(attr1.Type, Equals, RCT_DeleteAllVersions) 1760 c.Assert(attr1.Value, IsNil) 1761 c.Assert(attr1.Timestamp, IsNil) 1762 c.Assert(attr2, NotNil) 1763 c.Assert(*attr2.Name, Equals, "colToUpdate") 1764 c.Assert(attr2.Type, Equals, RCT_Put) 1765 c.Assert(attr2.Value, Equals, 3.14) 1766 } 1767 { 1768 chg := DeleteRowChange{} 1769 chg.TableName = tableName 1770 pk := PrimaryKey{} 1771 pk.AddPrimaryKeyColumn("pk1", "rowkey") 1772 chg.PrimaryKey = &pk 1773 chg.SetCondition(RowExistenceExpectation_IGNORE) 1774 _, err := client.DeleteRow(&DeleteRowRequest{DeleteRowChange: &chg}) 1775 c.Assert(err, IsNil) 1776 } 1777 iter, records = exhaustStreamRecords(c, iter) 1778 { 1779 c.Assert(len(records), Equals, 1) 1780 r := records[0] 1781 c.Assert(r.Type, Equals, AT_Delete) 1782 c.Assert(r.Info, NotNil) 1783 c.Assert(r.PrimaryKey, NotNil) 1784 1785 pkey := r.PrimaryKey 1786 c.Assert(len(pkey.PrimaryKeys), Equals, 1) 1787 pkc := pkey.PrimaryKeys[0] 1788 c.Assert(pkc, NotNil) 1789 c.Assert(pkc.ColumnName, Equals, "pk1") 1790 c.Assert(pkc.Value, Equals, "rowkey") 1791 c.Assert(pkc.PrimaryKeyOption, Equals, NONE) 1792 1793 c.Assert(len(r.Columns), Equals, 0) 1794 } 1795 fmt.Println("TestCreateTableWithStream finish") 1796} 1797 1798func exhaustStreamRecords(c *C, iter *ShardIterator) (*ShardIterator, []*StreamRecord) { 1799 records := make([]*StreamRecord, 0) 1800 for { 1801 resp, err := client.GetStreamRecord(&GetStreamRecordRequest{ 1802 ShardIterator: iter}) 1803 c.Assert(err, IsNil) 1804 fmt.Printf("#records: %d\n", len(resp.Records)) 1805 for i, rec := range resp.Records { 1806 fmt.Printf("record %d: %s\n", i, rec) 1807 } 1808 for _, rec := range resp.Records { 1809 records = append(records, rec) 1810 } 1811 nextIter := resp.NextShardIterator 1812 if nextIter == nil { 1813 fmt.Printf("next iterator: %#v\n", nextIter) 1814 break 1815 } else { 1816 fmt.Printf("next iterator: %#v\n", *nextIter) 1817 } 1818 if *iter == *nextIter { 1819 break 1820 } 1821 iter = nextIter 1822 } 1823 return iter, records 1824} 1825