1/* 2Copyright 2019 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "context" 21 "fmt" 22 "math" 23 "math/rand" 24 "sort" 25 "strings" 26 "sync" 27 "testing" 28 "time" 29 30 "cloud.google.com/go/internal/testutil" 31 "github.com/golang/protobuf/proto" 32 "google.golang.org/api/iterator" 33 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 34) 35 36var presidentsSocialGraph = map[string][]string{ 37 "wmckinley": {"tjefferson"}, 38 "gwashington": {"jadams"}, 39 "tjefferson": {"gwashington", "jadams"}, 40 "jadams": {"gwashington", "tjefferson"}, 41} 42 43func populatePresidentsGraph(table *Table) error { 44 ctx := context.Background() 45 for row, ss := range presidentsSocialGraph { 46 mut := NewMutation() 47 for _, name := range ss { 48 mut.Set("follows", name, 1000, []byte("1")) 49 } 50 if err := table.Apply(ctx, row, mut); err != nil { 51 return fmt.Errorf("Mutating row %q: %v", row, err) 52 } 53 } 54 return nil 55} 56 57func TestIntegration_ConditionalMutations(t *testing.T) { 58 ctx := context.Background() 59 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 60 if err != nil { 61 t.Fatal(err) 62 } 63 defer cleanup() 64 65 if err := populatePresidentsGraph(table); err != nil { 66 t.Fatal(err) 67 } 68 69 // Do a conditional mutation with a complex filter. 70 mutTrue := NewMutation() 71 mutTrue.Set("follows", "wmckinley", 1000, []byte("1")) 72 filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter(".")) 73 mut := NewCondMutation(filter, mutTrue, nil) 74 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 75 t.Fatalf("Conditionally mutating row: %v", err) 76 } 77 // Do a second condition mutation with a filter that does not match, 78 // and thus no changes should be made. 79 mutTrue = NewMutation() 80 mutTrue.DeleteRow() 81 filter = ColumnFilter("snoop.dogg") 82 mut = NewCondMutation(filter, mutTrue, nil) 83 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 84 t.Fatalf("Conditionally mutating row: %v", err) 85 } 86 87 // Fetch a row. 88 row, err := table.ReadRow(ctx, "jadams") 89 if err != nil { 90 t.Fatalf("Reading a row: %v", err) 91 } 92 wantRow := Row{ 93 "follows": []ReadItem{ 94 {Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")}, 95 {Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")}, 96 }, 97 } 98 if !testutil.Equal(row, wantRow) { 99 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 100 } 101} 102 103func TestIntegration_PartialReadRows(t *testing.T) { 104 ctx := context.Background() 105 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 106 if err != nil { 107 t.Fatal(err) 108 } 109 defer cleanup() 110 111 if err := populatePresidentsGraph(table); err != nil { 112 t.Fatal(err) 113 } 114 115 // Do a scan and stop part way through. 116 // Verify that the ReadRows callback doesn't keep running. 117 stopped := false 118 err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { 119 if r.Key() < "h" { 120 return true 121 } 122 if !stopped { 123 stopped = true 124 return false 125 } 126 t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key()) 127 return false 128 }) 129 if err != nil { 130 t.Fatalf("Partial ReadRows: %v", err) 131 } 132} 133 134func TestIntegration_ReadRowList(t *testing.T) { 135 ctx := context.Background() 136 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 137 if err != nil { 138 t.Fatal(err) 139 } 140 defer cleanup() 141 142 if err := populatePresidentsGraph(table); err != nil { 143 t.Fatal(err) 144 } 145 146 // Read a RowList 147 var elt []string 148 keys := RowList{"wmckinley", "gwashington", "jadams"} 149 want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1" 150 err = table.ReadRows(ctx, keys, func(r Row) bool { 151 for _, ris := range r { 152 for _, ri := range ris { 153 elt = append(elt, formatReadItem(ri)) 154 } 155 } 156 return true 157 }) 158 if err != nil { 159 t.Fatalf("read RowList: %v", err) 160 } 161 162 if got := strings.Join(elt, ","); got != want { 163 t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) 164 } 165} 166 167func TestIntegration_DeleteRow(t *testing.T) { 168 ctx := context.Background() 169 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 170 if err != nil { 171 t.Fatal(err) 172 } 173 defer cleanup() 174 175 if err := populatePresidentsGraph(table); err != nil { 176 t.Fatal(err) 177 } 178 179 // Delete a row and check it goes away. 180 mut := NewMutation() 181 mut.DeleteRow() 182 if err := table.Apply(ctx, "wmckinley", mut); err != nil { 183 t.Fatalf("Apply DeleteRow: %v", err) 184 } 185 row, err := table.ReadRow(ctx, "wmckinley") 186 if err != nil { 187 t.Fatalf("Reading a row after DeleteRow: %v", err) 188 } 189 if len(row) != 0 { 190 t.Fatalf("Read non-zero row after DeleteRow: %v", row) 191 } 192} 193 194func TestIntegration_ReadModifyWrite(t *testing.T) { 195 ctx := context.Background() 196 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 197 if err != nil { 198 t.Fatal(err) 199 } 200 defer cleanup() 201 202 if err := populatePresidentsGraph(table); err != nil { 203 t.Fatal(err) 204 } 205 206 if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil { 207 t.Fatalf("Creating column family: %v", err) 208 } 209 210 appendRMW := func(b []byte) *ReadModifyWrite { 211 rmw := NewReadModifyWrite() 212 rmw.AppendValue("counter", "likes", b) 213 return rmw 214 } 215 incRMW := func(n int64) *ReadModifyWrite { 216 rmw := NewReadModifyWrite() 217 rmw.Increment("counter", "likes", n) 218 return rmw 219 } 220 rmwSeq := []struct { 221 desc string 222 rmw *ReadModifyWrite 223 want []byte 224 }{ 225 { 226 desc: "append #1", 227 rmw: appendRMW([]byte{0, 0, 0}), 228 want: []byte{0, 0, 0}, 229 }, 230 { 231 desc: "append #2", 232 rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17 233 want: []byte{0, 0, 0, 0, 0, 0, 0, 17}, 234 }, 235 { 236 desc: "increment", 237 rmw: incRMW(8), 238 want: []byte{0, 0, 0, 0, 0, 0, 0, 25}, 239 }, 240 } 241 for _, step := range rmwSeq { 242 row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw) 243 if err != nil { 244 t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) 245 } 246 // Make sure the modified cell returned by the RMW operation has a timestamp. 247 if row["counter"][0].Timestamp == 0 { 248 t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp) 249 } 250 clearTimestamps(row) 251 wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}} 252 if !testutil.Equal(row, wantRow) { 253 t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow) 254 } 255 } 256 257 // Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator. 258 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0})) 259 if err != nil { 260 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 261 } 262 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0})) 263 if err != nil { 264 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 265 } 266 // Get only the correct row back on read. 267 r, err := table.ReadRow(ctx, "issue-723-1") 268 if err != nil { 269 t.Fatalf("Reading row: %v", err) 270 } 271 if r.Key() != "issue-723-1" { 272 t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1") 273 } 274} 275 276func TestIntegration_ArbitraryTimestamps(t *testing.T) { 277 ctx := context.Background() 278 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 279 if err != nil { 280 t.Fatal(err) 281 } 282 defer cleanup() 283 284 // Test arbitrary timestamps more thoroughly. 285 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 286 t.Fatalf("Creating column family: %v", err) 287 } 288 const numVersions = 4 289 mut := NewMutation() 290 for i := 1; i < numVersions; i++ { 291 // Timestamps are used in thousands because the server 292 // only permits that granularity. 293 mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 294 mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 295 } 296 if err := table.Apply(ctx, "testrow", mut); err != nil { 297 t.Fatalf("Mutating row: %v", err) 298 } 299 r, err := table.ReadRow(ctx, "testrow") 300 if err != nil { 301 t.Fatalf("Reading row: %v", err) 302 } 303 wantRow := Row{"ts": []ReadItem{ 304 // These should be returned in descending timestamp order. 305 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 306 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 307 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 308 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 309 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 310 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 311 }} 312 if !testutil.Equal(r, wantRow) { 313 t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow) 314 } 315 316 // Do the same read, but filter to the latest two versions. 317 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 318 if err != nil { 319 t.Fatalf("Reading row: %v", err) 320 } 321 wantRow = Row{"ts": []ReadItem{ 322 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 323 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 324 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 325 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 326 }} 327 if !testutil.Equal(r, wantRow) { 328 t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow) 329 } 330 // Check cell offset / limit 331 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3))) 332 if err != nil { 333 t.Fatalf("Reading row: %v", err) 334 } 335 wantRow = Row{"ts": []ReadItem{ 336 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 337 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 338 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 339 }} 340 if !testutil.Equal(r, wantRow) { 341 t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow) 342 } 343 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3))) 344 if err != nil { 345 t.Fatalf("Reading row: %v", err) 346 } 347 wantRow = Row{"ts": []ReadItem{ 348 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 349 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 350 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 351 }} 352 if !testutil.Equal(r, wantRow) { 353 t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow) 354 } 355 // Check timestamp range filtering (with truncation) 356 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000))) 357 if err != nil { 358 t.Fatalf("Reading row: %v", err) 359 } 360 wantRow = Row{"ts": []ReadItem{ 361 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 362 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 363 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 364 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 365 }} 366 if !testutil.Equal(r, wantRow) { 367 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow) 368 } 369 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0))) 370 if err != nil { 371 t.Fatalf("Reading row: %v", err) 372 } 373 wantRow = Row{"ts": []ReadItem{ 374 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 375 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 376 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 377 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 378 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 379 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 380 }} 381 if !testutil.Equal(r, wantRow) { 382 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow) 383 } 384 // Delete non-existing cells, no such column family in this row 385 // Should not delete anything 386 if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil { 387 t.Fatalf("Creating column family: %v", err) 388 } 389 mut = NewMutation() 390 mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval 391 if err := table.Apply(ctx, "testrow", mut); err != nil { 392 t.Fatalf("Mutating row: %v", err) 393 } 394 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 395 if err != nil { 396 t.Fatalf("Reading row: %v", err) 397 } 398 if !testutil.Equal(r, wantRow) { 399 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 400 } 401 // Delete non-existing cells, no such column in this column family 402 // Should not delete anything 403 mut = NewMutation() 404 mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval 405 if err := table.Apply(ctx, "testrow", mut); err != nil { 406 t.Fatalf("Mutating row: %v", err) 407 } 408 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 409 if err != nil { 410 t.Fatalf("Reading row: %v", err) 411 } 412 if !testutil.Equal(r, wantRow) { 413 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 414 } 415 // Delete the cell with timestamp 2000 and repeat the last read, 416 // checking that we get ts 3000 and ts 1000. 417 mut = NewMutation() 418 mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval 419 if err := table.Apply(ctx, "testrow", mut); err != nil { 420 t.Fatalf("Mutating row: %v", err) 421 } 422 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 423 if err != nil { 424 t.Fatalf("Reading row: %v", err) 425 } 426 wantRow = Row{"ts": []ReadItem{ 427 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 428 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 429 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 430 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 431 }} 432 if !testutil.Equal(r, wantRow) { 433 t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow) 434 } 435 436 // Check DeleteCellsInFamily 437 if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil { 438 t.Fatalf("Creating column family: %v", err) 439 } 440 441 mut = NewMutation() 442 mut.Set("status", "start", 2000, []byte("2")) 443 mut.Set("status", "end", 3000, []byte("3")) 444 mut.Set("ts", "col", 1000, []byte("1")) 445 if err := table.Apply(ctx, "row1", mut); err != nil { 446 t.Fatalf("Mutating row: %v", err) 447 } 448 if err := table.Apply(ctx, "row2", mut); err != nil { 449 t.Fatalf("Mutating row: %v", err) 450 } 451 452 mut = NewMutation() 453 mut.DeleteCellsInFamily("status") 454 if err := table.Apply(ctx, "row1", mut); err != nil { 455 t.Fatalf("Delete cf: %v", err) 456 } 457 458 // ColumnFamily removed 459 r, err = table.ReadRow(ctx, "row1") 460 if err != nil { 461 t.Fatalf("Reading row: %v", err) 462 } 463 wantRow = Row{"ts": []ReadItem{ 464 {Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 465 }} 466 if !testutil.Equal(r, wantRow) { 467 t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow) 468 } 469 470 // ColumnFamily not removed 471 r, err = table.ReadRow(ctx, "row2") 472 if err != nil { 473 t.Fatalf("Reading row: %v", err) 474 } 475 wantRow = Row{ 476 "ts": []ReadItem{ 477 {Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 478 }, 479 "status": []ReadItem{ 480 {Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")}, 481 {Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 482 }, 483 } 484 if !testutil.Equal(r, wantRow) { 485 t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow) 486 } 487 488 // Check DeleteCellsInColumn 489 mut = NewMutation() 490 mut.Set("status", "start", 2000, []byte("2")) 491 mut.Set("status", "middle", 3000, []byte("3")) 492 mut.Set("status", "end", 1000, []byte("1")) 493 if err := table.Apply(ctx, "row3", mut); err != nil { 494 t.Fatalf("Mutating row: %v", err) 495 } 496 mut = NewMutation() 497 mut.DeleteCellsInColumn("status", "middle") 498 if err := table.Apply(ctx, "row3", mut); err != nil { 499 t.Fatalf("Delete column: %v", err) 500 } 501 r, err = table.ReadRow(ctx, "row3") 502 if err != nil { 503 t.Fatalf("Reading row: %v", err) 504 } 505 wantRow = Row{ 506 "status": []ReadItem{ 507 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 508 {Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 509 }, 510 } 511 if !testutil.Equal(r, wantRow) { 512 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 513 } 514 mut = NewMutation() 515 mut.DeleteCellsInColumn("status", "start") 516 if err := table.Apply(ctx, "row3", mut); err != nil { 517 t.Fatalf("Delete column: %v", err) 518 } 519 r, err = table.ReadRow(ctx, "row3") 520 if err != nil { 521 t.Fatalf("Reading row: %v", err) 522 } 523 wantRow = Row{ 524 "status": []ReadItem{ 525 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 526 }, 527 } 528 if !testutil.Equal(r, wantRow) { 529 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 530 } 531 mut = NewMutation() 532 mut.DeleteCellsInColumn("status", "end") 533 if err := table.Apply(ctx, "row3", mut); err != nil { 534 t.Fatalf("Delete column: %v", err) 535 } 536 r, err = table.ReadRow(ctx, "row3") 537 if err != nil { 538 t.Fatalf("Reading row: %v", err) 539 } 540 if len(r) != 0 { 541 t.Fatalf("Delete column: got %v, want empty row", r) 542 } 543 // Add same cell after delete 544 mut = NewMutation() 545 mut.Set("status", "end", 1000, []byte("1")) 546 if err := table.Apply(ctx, "row3", mut); err != nil { 547 t.Fatalf("Mutating row: %v", err) 548 } 549 r, err = table.ReadRow(ctx, "row3") 550 if err != nil { 551 t.Fatalf("Reading row: %v", err) 552 } 553 if !testutil.Equal(r, wantRow) { 554 t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow) 555 } 556} 557 558func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { 559 ctx := context.Background() 560 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 561 if err != nil { 562 t.Fatal(err) 563 } 564 defer cleanup() 565 566 if err := populatePresidentsGraph(table); err != nil { 567 t.Fatal(err) 568 } 569 570 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 571 t.Fatalf("Creating column family: %v", err) 572 } 573 574 // Do highly concurrent reads/writes. 575 const maxConcurrency = 1000 576 var wg sync.WaitGroup 577 for i := 0; i < maxConcurrency; i++ { 578 wg.Add(1) 579 go func() { 580 defer wg.Done() 581 switch r := rand.Intn(100); { // r ∈ [0,100) 582 case 0 <= r && r < 30: 583 // Do a read. 584 _, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1))) 585 if err != nil { 586 t.Errorf("Concurrent read: %v", err) 587 } 588 case 30 <= r && r < 100: 589 // Do a write. 590 mut := NewMutation() 591 mut.Set("ts", "col", 1000, []byte("data")) 592 if err := table.Apply(ctx, "testrow", mut); err != nil { 593 t.Errorf("Concurrent write: %v", err) 594 } 595 } 596 }() 597 } 598 wg.Wait() 599} 600 601func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { 602 ctx := context.Background() 603 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 604 if err != nil { 605 t.Fatal(err) 606 } 607 defer cleanup() 608 609 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 610 t.Fatalf("Creating column family: %v", err) 611 } 612 613 bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set. 614 nonsense := []byte("lorem ipsum dolor sit amet, ") 615 fill(bigBytes, nonsense) 616 mut := NewMutation() 617 mut.Set("ts", "col", 1000, bigBytes) 618 if err := table.Apply(ctx, "bigrow", mut); err != nil { 619 t.Fatalf("Big write: %v", err) 620 } 621 r, err := table.ReadRow(ctx, "bigrow") 622 if err != nil { 623 t.Fatalf("Big read: %v", err) 624 } 625 wantRow := Row{"ts": []ReadItem{ 626 {Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes}, 627 }} 628 if !testutil.Equal(r, wantRow) { 629 t.Fatalf("Big read returned incorrect bytes: %v", r) 630 } 631 632 var wg sync.WaitGroup 633 // Now write 1000 rows, each with 82 KB values, then scan them all. 634 medBytes := make([]byte, 82<<10) 635 fill(medBytes, nonsense) 636 sem := make(chan int, 50) // do up to 50 mutations at a time. 637 for i := 0; i < 1000; i++ { 638 mut := NewMutation() 639 mut.Set("ts", "big-scan", 1000, medBytes) 640 row := fmt.Sprintf("row-%d", i) 641 wg.Add(1) 642 go func() { 643 defer wg.Done() 644 defer func() { <-sem }() 645 sem <- 1 646 if err := table.Apply(ctx, row, mut); err != nil { 647 t.Errorf("Preparing large scan: %v", err) 648 } 649 }() 650 } 651 wg.Wait() 652 n := 0 653 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 654 for _, ris := range r { 655 for _, ri := range ris { 656 n += len(ri.Value) 657 } 658 } 659 return true 660 }, RowFilter(ColumnFilter("big-scan"))) 661 if err != nil { 662 t.Fatalf("Doing large scan: %v", err) 663 } 664 if want := 1000 * len(medBytes); n != want { 665 t.Fatalf("Large scan returned %d bytes, want %d", n, want) 666 } 667 // Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption. 668 rc := 0 669 wantRc := 3 670 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 671 rc++ 672 return true 673 }, LimitRows(int64(wantRc))) 674 if err != nil { 675 t.Fatal(err) 676 } 677 if rc != wantRc { 678 t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) 679 } 680 681 // Test bulk mutations 682 if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil { 683 t.Fatalf("Creating column family: %v", err) 684 } 685 bulkData := map[string][]string{ 686 "red sox": {"2004", "2007", "2013"}, 687 "patriots": {"2001", "2003", "2004", "2014"}, 688 "celtics": {"1981", "1984", "1986", "2008"}, 689 } 690 var rowKeys []string 691 var muts []*Mutation 692 for row, ss := range bulkData { 693 mut := NewMutation() 694 for _, name := range ss { 695 mut.Set("bulk", name, 1000, []byte("1")) 696 } 697 rowKeys = append(rowKeys, row) 698 muts = append(muts, mut) 699 } 700 status, err := table.ApplyBulk(ctx, rowKeys, muts) 701 if err != nil { 702 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 703 } 704 if status != nil { 705 t.Fatalf("non-nil errors: %v", err) 706 } 707 708 // Read each row back 709 for rowKey, ss := range bulkData { 710 row, err := table.ReadRow(ctx, rowKey) 711 if err != nil { 712 t.Fatalf("Reading a bulk row: %v", err) 713 } 714 var wantItems []ReadItem 715 for _, val := range ss { 716 wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")}) 717 } 718 wantRow := Row{"bulk": wantItems} 719 if !testutil.Equal(row, wantRow) { 720 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 721 } 722 } 723 724 // Test bulk write errors. 725 // Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error. 726 badMut := NewMutation() 727 badMut.Set("badfamily", "col", ServerTime, nil) 728 badMut2 := NewMutation() 729 badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1")) 730 status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2}) 731 if err != nil { 732 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 733 } 734 if status == nil { 735 t.Fatalf("No errors for bad bulk mutation") 736 } else if status[0] == nil || status[1] == nil { 737 t.Fatalf("No error for bad bulk mutation") 738 } 739} 740 741func TestIntegration_Read(t *testing.T) { 742 ctx := context.Background() 743 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 744 if err != nil { 745 t.Fatal(err) 746 } 747 defer cleanup() 748 749 // Insert some data. 750 initialData := map[string][]string{ 751 "wmckinley": {"tjefferson"}, 752 "gwashington": {"jadams"}, 753 "tjefferson": {"gwashington", "jadams", "wmckinley"}, 754 "jadams": {"gwashington", "tjefferson"}, 755 } 756 for row, ss := range initialData { 757 mut := NewMutation() 758 for _, name := range ss { 759 mut.Set("follows", name, 1000, []byte("1")) 760 } 761 if err := table.Apply(ctx, row, mut); err != nil { 762 t.Fatalf("Mutating row %q: %v", row, err) 763 } 764 } 765 766 for _, test := range []struct { 767 desc string 768 rr RowSet 769 filter Filter // may be nil 770 limit ReadOption // may be nil 771 772 // We do the read, grab all the cells, turn them into "<row>-<col>-<val>", 773 // and join with a comma. 774 want string 775 }{ 776 { 777 desc: "read all, unfiltered", 778 rr: RowRange{}, 779 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 780 }, 781 { 782 desc: "read with InfiniteRange, unfiltered", 783 rr: InfiniteRange("tjefferson"), 784 want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 785 }, 786 { 787 desc: "read with NewRange, unfiltered", 788 rr: NewRange("gargamel", "hubbard"), 789 want: "gwashington-jadams-1", 790 }, 791 { 792 desc: "read with PrefixRange, unfiltered", 793 rr: PrefixRange("jad"), 794 want: "jadams-gwashington-1,jadams-tjefferson-1", 795 }, 796 { 797 desc: "read with SingleRow, unfiltered", 798 rr: SingleRow("wmckinley"), 799 want: "wmckinley-tjefferson-1", 800 }, 801 { 802 desc: "read all, with ColumnFilter", 803 rr: RowRange{}, 804 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 805 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 806 }, 807 { 808 desc: "read all, with ColumnFilter, prefix", 809 rr: RowRange{}, 810 filter: ColumnFilter("j"), // no matches 811 want: "", 812 }, 813 { 814 desc: "read range, with ColumnRangeFilter", 815 rr: RowRange{}, 816 filter: ColumnRangeFilter("follows", "h", "k"), 817 want: "gwashington-jadams-1,tjefferson-jadams-1", 818 }, 819 { 820 desc: "read range from empty, with ColumnRangeFilter", 821 rr: RowRange{}, 822 filter: ColumnRangeFilter("follows", "", "u"), 823 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 824 }, 825 { 826 desc: "read range from start to empty, with ColumnRangeFilter", 827 rr: RowRange{}, 828 filter: ColumnRangeFilter("follows", "h", ""), 829 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 830 }, 831 { 832 desc: "read with RowKeyFilter", 833 rr: RowRange{}, 834 filter: RowKeyFilter(".*wash.*"), 835 want: "gwashington-jadams-1", 836 }, 837 { 838 desc: "read with RowKeyFilter, prefix", 839 rr: RowRange{}, 840 filter: RowKeyFilter("gwash"), 841 want: "", 842 }, 843 { 844 desc: "read with RowKeyFilter, no matches", 845 rr: RowRange{}, 846 filter: RowKeyFilter(".*xxx.*"), 847 want: "", 848 }, 849 { 850 desc: "read with FamilyFilter, no matches", 851 rr: RowRange{}, 852 filter: FamilyFilter(".*xxx.*"), 853 want: "", 854 }, 855 { 856 desc: "read with ColumnFilter + row limit", 857 rr: RowRange{}, 858 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 859 limit: LimitRows(2), 860 want: "gwashington-jadams-1,jadams-tjefferson-1", 861 }, 862 { 863 desc: "read all, strip values", 864 rr: RowRange{}, 865 filter: StripValueFilter(), 866 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 867 }, 868 { 869 desc: "read with ColumnFilter + row limit + strip values", 870 rr: RowRange{}, 871 filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson" 872 limit: LimitRows(2), 873 want: "gwashington-jadams-,jadams-tjefferson-", 874 }, 875 { 876 desc: "read with condition, strip values on true", 877 rr: RowRange{}, 878 filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil), 879 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 880 }, 881 { 882 desc: "read with condition, strip values on false", 883 rr: RowRange{}, 884 filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()), 885 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 886 }, 887 { 888 desc: "read with ValueRangeFilter + row limit", 889 rr: RowRange{}, 890 filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1" 891 limit: LimitRows(2), 892 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1", 893 }, 894 { 895 desc: "read with ValueRangeFilter, no match on exclusive end", 896 rr: RowRange{}, 897 filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match 898 want: "", 899 }, 900 { 901 desc: "read with ValueRangeFilter, no matches", 902 rr: RowRange{}, 903 filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing 904 want: "", 905 }, 906 { 907 desc: "read with InterleaveFilter, no matches on all filters", 908 rr: RowRange{}, 909 filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")), 910 want: "", 911 }, 912 { 913 desc: "read with InterleaveFilter, no duplicate cells", 914 rr: RowRange{}, 915 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")), 916 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 917 }, 918 { 919 desc: "read with InterleaveFilter, with duplicate cells", 920 rr: RowRange{}, 921 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")), 922 want: "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1", 923 }, 924 { 925 desc: "read with a RowRangeList and no filter", 926 rr: RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")}, 927 want: "gwashington-jadams-1,wmckinley-tjefferson-1", 928 }, 929 { 930 desc: "chain that excludes rows and matches nothing, in a condition", 931 rr: RowRange{}, 932 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil), 933 want: "", 934 }, 935 { 936 desc: "chain that ends with an interleave that has no match. covers #804", 937 rr: RowRange{}, 938 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil), 939 want: "", 940 }, 941 } { 942 t.Run(test.desc, func(t *testing.T) { 943 var opts []ReadOption 944 if test.filter != nil { 945 opts = append(opts, RowFilter(test.filter)) 946 } 947 if test.limit != nil { 948 opts = append(opts, test.limit) 949 } 950 var elt []string 951 err := table.ReadRows(ctx, test.rr, func(r Row) bool { 952 for _, ris := range r { 953 for _, ri := range ris { 954 elt = append(elt, formatReadItem(ri)) 955 } 956 } 957 return true 958 }, opts...) 959 if err != nil { 960 t.Fatal(err) 961 } 962 if got := strings.Join(elt, ","); got != test.want { 963 t.Fatalf("got %q\nwant %q", got, test.want) 964 } 965 }) 966 } 967} 968 969func TestIntegration_SampleRowKeys(t *testing.T) { 970 ctx := context.Background() 971 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 972 if err != nil { 973 t.Fatal(err) 974 } 975 defer cleanup() 976 977 // Insert some data. 978 initialData := map[string][]string{ 979 "wmckinley11": {"tjefferson11"}, 980 "gwashington77": {"jadams77"}, 981 "tjefferson0": {"gwashington0", "jadams0"}, 982 } 983 984 for row, ss := range initialData { 985 mut := NewMutation() 986 for _, name := range ss { 987 mut.Set("follows", name, 1000, []byte("1")) 988 } 989 if err := table.Apply(ctx, row, mut); err != nil { 990 t.Fatalf("Mutating row %q: %v", row, err) 991 } 992 } 993 sampleKeys, err := table.SampleRowKeys(context.Background()) 994 if err != nil { 995 t.Fatalf("%s: %v", "SampleRowKeys:", err) 996 } 997 if len(sampleKeys) == 0 { 998 t.Error("SampleRowKeys length 0") 999 } 1000} 1001 1002func TestIntegration_Admin(t *testing.T) { 1003 testEnv, err := NewIntegrationEnv() 1004 if err != nil { 1005 t.Fatalf("IntegrationEnv: %v", err) 1006 } 1007 defer testEnv.Close() 1008 1009 timeout := 2 * time.Second 1010 if testEnv.Config().UseProd { 1011 timeout = 5 * time.Minute 1012 } 1013 ctx, _ := context.WithTimeout(context.Background(), timeout) 1014 1015 adminClient, err := testEnv.NewAdminClient() 1016 if err != nil { 1017 t.Fatalf("NewAdminClient: %v", err) 1018 } 1019 defer adminClient.Close() 1020 1021 iAdminClient, err := testEnv.NewInstanceAdminClient() 1022 if err != nil { 1023 t.Fatalf("NewInstanceAdminClient: %v", err) 1024 } 1025 if iAdminClient != nil { 1026 defer iAdminClient.Close() 1027 1028 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1029 if err != nil { 1030 t.Errorf("InstanceInfo: %v", err) 1031 } 1032 if iInfo.Name != adminClient.instance { 1033 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1034 } 1035 } 1036 1037 list := func() []string { 1038 tbls, err := adminClient.Tables(ctx) 1039 if err != nil { 1040 t.Fatalf("Fetching list of tables: %v", err) 1041 } 1042 sort.Strings(tbls) 1043 return tbls 1044 } 1045 containsAll := func(got, want []string) bool { 1046 gotSet := make(map[string]bool) 1047 1048 for _, s := range got { 1049 gotSet[s] = true 1050 } 1051 for _, s := range want { 1052 if !gotSet[s] { 1053 return false 1054 } 1055 } 1056 return true 1057 } 1058 1059 defer adminClient.DeleteTable(ctx, "mytable") 1060 1061 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1062 t.Fatalf("Creating table: %v", err) 1063 } 1064 1065 defer adminClient.DeleteTable(ctx, "myothertable") 1066 1067 if err := adminClient.CreateTable(ctx, "myothertable"); err != nil { 1068 t.Fatalf("Creating table: %v", err) 1069 } 1070 1071 if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) { 1072 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1073 } 1074 1075 must(adminClient.WaitForReplication(ctx, "mytable")) 1076 1077 if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil { 1078 t.Fatalf("Deleting table: %v", err) 1079 } 1080 tables := list() 1081 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1082 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1083 } 1084 if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) { 1085 t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted) 1086 } 1087 1088 tblConf := TableConf{ 1089 TableID: "conftable", 1090 Families: map[string]GCPolicy{ 1091 "fam1": MaxVersionsPolicy(1), 1092 "fam2": MaxVersionsPolicy(2), 1093 }, 1094 } 1095 if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil { 1096 t.Fatalf("Creating table from TableConf: %v", err) 1097 } 1098 defer adminClient.DeleteTable(ctx, tblConf.TableID) 1099 1100 tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID) 1101 if err != nil { 1102 t.Fatalf("Getting table info: %v", err) 1103 } 1104 sort.Strings(tblInfo.Families) 1105 wantFams := []string{"fam1", "fam2"} 1106 if !testutil.Equal(tblInfo.Families, wantFams) { 1107 t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams) 1108 } 1109 1110 // Populate mytable and drop row ranges 1111 if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil { 1112 t.Fatalf("Creating column family: %v", err) 1113 } 1114 1115 client, err := testEnv.NewClient() 1116 if err != nil { 1117 t.Fatalf("NewClient: %v", err) 1118 } 1119 defer client.Close() 1120 1121 tbl := client.Open("mytable") 1122 1123 prefixes := []string{"a", "b", "c"} 1124 for _, prefix := range prefixes { 1125 for i := 0; i < 5; i++ { 1126 mut := NewMutation() 1127 mut.Set("cf", "col", 1000, []byte("1")) 1128 if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil { 1129 t.Fatalf("Mutating row: %v", err) 1130 } 1131 } 1132 } 1133 1134 if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil { 1135 t.Errorf("DropRowRange a: %v", err) 1136 } 1137 if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil { 1138 t.Errorf("DropRowRange c: %v", err) 1139 } 1140 if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil { 1141 t.Errorf("DropRowRange x: %v", err) 1142 } 1143 1144 var gotRowCount int 1145 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1146 gotRowCount++ 1147 if !strings.HasPrefix(row.Key(), "b") { 1148 t.Errorf("Invalid row after dropping range: %v", row) 1149 } 1150 return true 1151 })) 1152 if gotRowCount != 5 { 1153 t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5) 1154 } 1155} 1156 1157func TestIntegration_AdminSnapshot(t *testing.T) { 1158 testEnv, err := NewIntegrationEnv() 1159 if err != nil { 1160 t.Fatalf("IntegrationEnv: %v", err) 1161 } 1162 defer testEnv.Close() 1163 1164 if !testEnv.Config().UseProd { 1165 t.Skip("emulator doesn't support snapshots") 1166 } 1167 1168 timeout := 2 * time.Second 1169 if testEnv.Config().UseProd { 1170 timeout = 5 * time.Minute 1171 } 1172 ctx, _ := context.WithTimeout(context.Background(), timeout) 1173 1174 adminClient, err := testEnv.NewAdminClient() 1175 if err != nil { 1176 t.Fatalf("NewAdminClient: %v", err) 1177 } 1178 defer adminClient.Close() 1179 1180 table := testEnv.Config().Table 1181 cluster := testEnv.Config().Cluster 1182 1183 list := func(cluster string) ([]*SnapshotInfo, error) { 1184 infos := []*SnapshotInfo(nil) 1185 1186 it := adminClient.Snapshots(ctx, cluster) 1187 for { 1188 s, err := it.Next() 1189 if err == iterator.Done { 1190 break 1191 } 1192 if err != nil { 1193 return nil, err 1194 } 1195 infos = append(infos, s) 1196 } 1197 return infos, err 1198 } 1199 1200 // Delete the table at the end of the test. Schedule ahead of time 1201 // in case the client fails 1202 defer adminClient.DeleteTable(ctx, table) 1203 1204 if err := adminClient.CreateTable(ctx, table); err != nil { 1205 t.Fatalf("Creating table: %v", err) 1206 } 1207 1208 // Precondition: no snapshots 1209 snapshots, err := list(cluster) 1210 if err != nil { 1211 t.Fatalf("Initial snapshot list: %v", err) 1212 } 1213 if got, want := len(snapshots), 0; got != want { 1214 t.Fatalf("Initial snapshot list len: %d, want: %d", got, want) 1215 } 1216 1217 // Create snapshot 1218 defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot") 1219 1220 if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil { 1221 t.Fatalf("Creating snaphot: %v", err) 1222 } 1223 1224 // List snapshot 1225 snapshots, err = list(cluster) 1226 if err != nil { 1227 t.Fatalf("Listing snapshots: %v", err) 1228 } 1229 if got, want := len(snapshots), 1; got != want { 1230 t.Fatalf("Listing snapshot count: %d, want: %d", got, want) 1231 } 1232 if got, want := snapshots[0].Name, "mysnapshot"; got != want { 1233 t.Fatalf("Snapshot name: %s, want: %s", got, want) 1234 } 1235 if got, want := snapshots[0].SourceTable, table; got != want { 1236 t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want) 1237 } 1238 if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { 1239 t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want) 1240 } 1241 1242 // Get snapshot 1243 snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot") 1244 if err != nil { 1245 t.Fatalf("SnapshotInfo: %v", snapshot) 1246 } 1247 if got, want := *snapshot, *snapshots[0]; got != want { 1248 t.Fatalf("SnapshotInfo: %v, want: %v", got, want) 1249 } 1250 1251 // Restore 1252 restoredTable := table + "-restored" 1253 defer adminClient.DeleteTable(ctx, restoredTable) 1254 if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil { 1255 t.Fatalf("CreateTableFromSnapshot: %v", err) 1256 } 1257 if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { 1258 t.Fatalf("Restored TableInfo: %v", err) 1259 } 1260 1261 // Delete snapshot 1262 if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil { 1263 t.Fatalf("DeleteSnapshot: %v", err) 1264 } 1265 snapshots, err = list(cluster) 1266 if err != nil { 1267 t.Fatalf("List after Delete: %v", err) 1268 } 1269 if got, want := len(snapshots), 0; got != want { 1270 t.Fatalf("List after delete len: %d, want: %d", got, want) 1271 } 1272} 1273 1274func TestIntegration_Granularity(t *testing.T) { 1275 testEnv, err := NewIntegrationEnv() 1276 if err != nil { 1277 t.Fatalf("IntegrationEnv: %v", err) 1278 } 1279 defer testEnv.Close() 1280 1281 timeout := 2 * time.Second 1282 if testEnv.Config().UseProd { 1283 timeout = 5 * time.Minute 1284 } 1285 ctx, _ := context.WithTimeout(context.Background(), timeout) 1286 1287 adminClient, err := testEnv.NewAdminClient() 1288 if err != nil { 1289 t.Fatalf("NewAdminClient: %v", err) 1290 } 1291 defer adminClient.Close() 1292 1293 list := func() []string { 1294 tbls, err := adminClient.Tables(ctx) 1295 if err != nil { 1296 t.Fatalf("Fetching list of tables: %v", err) 1297 } 1298 sort.Strings(tbls) 1299 return tbls 1300 } 1301 containsAll := func(got, want []string) bool { 1302 gotSet := make(map[string]bool) 1303 1304 for _, s := range got { 1305 gotSet[s] = true 1306 } 1307 for _, s := range want { 1308 if !gotSet[s] { 1309 return false 1310 } 1311 } 1312 return true 1313 } 1314 1315 defer adminClient.DeleteTable(ctx, "mytable") 1316 1317 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1318 t.Fatalf("Creating table: %v", err) 1319 } 1320 1321 tables := list() 1322 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1323 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1324 } 1325 1326 // calling ModifyColumnFamilies to check the granularity of table 1327 prefix := adminClient.instancePrefix() 1328 req := &btapb.ModifyColumnFamiliesRequest{ 1329 Name: prefix + "/tables/" + "mytable", 1330 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 1331 Id: "cf", 1332 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}}, 1333 }}, 1334 } 1335 table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req) 1336 if err != nil { 1337 t.Fatalf("Creating column family: %v", err) 1338 } 1339 if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) { 1340 t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS)) 1341 } 1342} 1343 1344func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) { 1345 testEnv, err := NewIntegrationEnv() 1346 if err != nil { 1347 t.Fatalf("IntegrationEnv: %v", err) 1348 } 1349 defer testEnv.Close() 1350 1351 timeout := 2 * time.Second 1352 if testEnv.Config().UseProd { 1353 timeout = 5 * time.Minute 1354 } 1355 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1356 defer cancel() 1357 1358 adminClient, err := testEnv.NewAdminClient() 1359 if err != nil { 1360 t.Fatalf("NewAdminClient: %v", err) 1361 } 1362 defer adminClient.Close() 1363 1364 iAdminClient, err := testEnv.NewInstanceAdminClient() 1365 if err != nil { 1366 t.Fatalf("NewInstanceAdminClient: %v", err) 1367 } 1368 1369 if iAdminClient == nil { 1370 return 1371 } 1372 1373 defer iAdminClient.Close() 1374 profile := ProfileConf{ 1375 ProfileID: "app_profile1", 1376 InstanceID: adminClient.instance, 1377 ClusterID: testEnv.Config().Cluster, 1378 Description: "creating new app profile 1", 1379 RoutingPolicy: SingleClusterRouting, 1380 } 1381 1382 createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile) 1383 if err != nil { 1384 t.Fatalf("Creating app profile: %v", err) 1385 1386 } 1387 1388 gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 1389 1390 if err != nil { 1391 t.Fatalf("Get app profile: %v", err) 1392 } 1393 1394 if !proto.Equal(createdProfile, gotProfile) { 1395 t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name) 1396 1397 } 1398 1399 list := func(instanceID string) ([]*btapb.AppProfile, error) { 1400 profiles := []*btapb.AppProfile(nil) 1401 1402 it := iAdminClient.ListAppProfiles(ctx, instanceID) 1403 for { 1404 s, err := it.Next() 1405 if err == iterator.Done { 1406 break 1407 } 1408 if err != nil { 1409 return nil, err 1410 } 1411 profiles = append(profiles, s) 1412 } 1413 return profiles, err 1414 } 1415 1416 profiles, err := list(adminClient.instance) 1417 if err != nil { 1418 t.Fatalf("List app profile: %v", err) 1419 } 1420 1421 if got, want := len(profiles), 1; got != want { 1422 t.Fatalf("Initial app profile list len: %d, want: %d", got, want) 1423 } 1424 1425 for _, test := range []struct { 1426 desc string 1427 uattrs ProfileAttrsToUpdate 1428 want *btapb.AppProfile // nil means error 1429 }{ 1430 { 1431 desc: "empty update", 1432 uattrs: ProfileAttrsToUpdate{}, 1433 want: nil, 1434 }, 1435 1436 { 1437 desc: "empty description update", 1438 uattrs: ProfileAttrsToUpdate{Description: ""}, 1439 want: &btapb.AppProfile{ 1440 Name: gotProfile.Name, 1441 Description: "", 1442 RoutingPolicy: gotProfile.RoutingPolicy, 1443 Etag: gotProfile.Etag}, 1444 }, 1445 { 1446 desc: "routing update", 1447 uattrs: ProfileAttrsToUpdate{ 1448 RoutingPolicy: SingleClusterRouting, 1449 ClusterID: testEnv.Config().Cluster, 1450 }, 1451 want: &btapb.AppProfile{ 1452 Name: gotProfile.Name, 1453 Description: "", 1454 Etag: gotProfile.Etag, 1455 RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{ 1456 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1457 ClusterId: testEnv.Config().Cluster, 1458 }}, 1459 }, 1460 }, 1461 } { 1462 err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs) 1463 if err != nil { 1464 if test.want != nil { 1465 t.Errorf("%s: %v", test.desc, err) 1466 } 1467 continue 1468 } 1469 if err == nil && test.want == nil { 1470 t.Errorf("%s: got nil, want error", test.desc) 1471 continue 1472 } 1473 1474 got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 1475 1476 if !proto.Equal(got, test.want) { 1477 t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want) 1478 } 1479 1480 } 1481 1482 err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1") 1483 if err != nil { 1484 t.Fatalf("Delete app profile: %v", err) 1485 } 1486 1487} 1488 1489func TestIntegration_InstanceUpdate(t *testing.T) { 1490 testEnv, err := NewIntegrationEnv() 1491 if err != nil { 1492 t.Fatalf("IntegrationEnv: %v", err) 1493 } 1494 defer testEnv.Close() 1495 1496 timeout := 2 * time.Second 1497 if testEnv.Config().UseProd { 1498 timeout = 5 * time.Minute 1499 } 1500 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1501 defer cancel() 1502 1503 adminClient, err := testEnv.NewAdminClient() 1504 if err != nil { 1505 t.Fatalf("NewAdminClient: %v", err) 1506 } 1507 1508 defer adminClient.Close() 1509 1510 iAdminClient, err := testEnv.NewInstanceAdminClient() 1511 if err != nil { 1512 t.Fatalf("NewInstanceAdminClient: %v", err) 1513 } 1514 1515 if iAdminClient == nil { 1516 return 1517 } 1518 1519 defer iAdminClient.Close() 1520 1521 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1522 if err != nil { 1523 t.Errorf("InstanceInfo: %v", err) 1524 } 1525 1526 if iInfo.Name != adminClient.instance { 1527 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1528 } 1529 1530 if iInfo.DisplayName != adminClient.instance { 1531 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1532 } 1533 1534 const numNodes = 4 1535 // update cluster nodes 1536 if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil { 1537 t.Errorf("UpdateCluster: %v", err) 1538 } 1539 1540 // get cluster after updating 1541 cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster) 1542 if err != nil { 1543 t.Errorf("GetCluster %v", err) 1544 } 1545 if cis.ServeNodes != int(numNodes) { 1546 t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes)) 1547 } 1548} 1549 1550func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { 1551 testEnv, err := NewIntegrationEnv() 1552 if err != nil { 1553 return nil, nil, nil, "", nil, err 1554 } 1555 1556 var timeout time.Duration 1557 if testEnv.Config().UseProd { 1558 timeout = 10 * time.Minute 1559 t.Logf("Running test against production") 1560 } else { 1561 timeout = 1 * time.Minute 1562 t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint) 1563 } 1564 ctx, cancel := context.WithTimeout(ctx, timeout) 1565 defer cancel() 1566 1567 client, err := testEnv.NewClient() 1568 if err != nil { 1569 return nil, nil, nil, "", nil, err 1570 } 1571 1572 adminClient, err := testEnv.NewAdminClient() 1573 if err != nil { 1574 return nil, nil, nil, "", nil, err 1575 } 1576 1577 tableName = testEnv.Config().Table 1578 if err := adminClient.CreateTable(ctx, tableName); err != nil { 1579 return nil, nil, nil, "", nil, err 1580 } 1581 if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil { 1582 return nil, nil, nil, "", nil, err 1583 } 1584 1585 return client, adminClient, client.Open(tableName), tableName, func() { 1586 adminClient.DeleteTable(ctx, tableName) 1587 client.Close() 1588 adminClient.Close() 1589 }, nil 1590} 1591 1592func formatReadItem(ri ReadItem) string { 1593 // Use the column qualifier only to make the test data briefer. 1594 col := ri.Column[strings.Index(ri.Column, ":")+1:] 1595 return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value) 1596} 1597 1598func fill(b, sub []byte) { 1599 for len(b) > len(sub) { 1600 n := copy(b, sub) 1601 b = b[n:] 1602 } 1603} 1604 1605func clearTimestamps(r Row) { 1606 for _, ris := range r { 1607 for i := range ris { 1608 ris[i].Timestamp = 0 1609 } 1610 } 1611} 1612