1/* 2Copyright 2019 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "context" 21 "flag" 22 "fmt" 23 "math" 24 "math/rand" 25 "sort" 26 "strings" 27 "sync" 28 "testing" 29 "time" 30 31 "cloud.google.com/go/internal/testutil" 32 "github.com/golang/protobuf/proto" 33 "google.golang.org/api/iterator" 34 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 35) 36 37var presidentsSocialGraph = map[string][]string{ 38 "wmckinley": {"tjefferson"}, 39 "gwashington": {"jadams"}, 40 "tjefferson": {"gwashington", "jadams"}, 41 "jadams": {"gwashington", "tjefferson"}, 42} 43 44func populatePresidentsGraph(table *Table) error { 45 ctx := context.Background() 46 for row, ss := range presidentsSocialGraph { 47 mut := NewMutation() 48 for _, name := range ss { 49 mut.Set("follows", name, 1000, []byte("1")) 50 } 51 if err := table.Apply(ctx, row, mut); err != nil { 52 return fmt.Errorf("Mutating row %q: %v", row, err) 53 } 54 } 55 return nil 56} 57 58var instanceToCreate string 59var instanceToCreateZone string 60var instanceToCreateZone2 string 61 62func init() { 63 // Don't test instance creation by default, as quota is necessary and aborted tests could strand resources. 64 flag.StringVar(&instanceToCreate, "it.instance-to-create", "", 65 "The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.") 66 flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b", 67 "The zone in which to create the new test instance.") 68 flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c", 69 "The zone in which to create a second cluster in the test instance.") 70} 71 72func TestIntegration_ConditionalMutations(t *testing.T) { 73 ctx := context.Background() 74 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 75 if err != nil { 76 t.Fatal(err) 77 } 78 defer cleanup() 79 80 if err := populatePresidentsGraph(table); err != nil { 81 t.Fatal(err) 82 } 83 84 // Do a conditional mutation with a complex filter. 85 mutTrue := NewMutation() 86 mutTrue.Set("follows", "wmckinley", 1000, []byte("1")) 87 filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter(".")) 88 mut := NewCondMutation(filter, mutTrue, nil) 89 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 90 t.Fatalf("Conditionally mutating row: %v", err) 91 } 92 // Do a second condition mutation with a filter that does not match, 93 // and thus no changes should be made. 94 mutTrue = NewMutation() 95 mutTrue.DeleteRow() 96 filter = ColumnFilter("snoop.dogg") 97 mut = NewCondMutation(filter, mutTrue, nil) 98 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 99 t.Fatalf("Conditionally mutating row: %v", err) 100 } 101 102 // Fetch a row. 103 row, err := table.ReadRow(ctx, "jadams") 104 if err != nil { 105 t.Fatalf("Reading a row: %v", err) 106 } 107 wantRow := Row{ 108 "follows": []ReadItem{ 109 {Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")}, 110 {Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")}, 111 }, 112 } 113 if !testutil.Equal(row, wantRow) { 114 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 115 } 116} 117 118func TestIntegration_PartialReadRows(t *testing.T) { 119 ctx := context.Background() 120 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 121 if err != nil { 122 t.Fatal(err) 123 } 124 defer cleanup() 125 126 if err := populatePresidentsGraph(table); err != nil { 127 t.Fatal(err) 128 } 129 130 // Do a scan and stop part way through. 131 // Verify that the ReadRows callback doesn't keep running. 132 stopped := false 133 err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { 134 if r.Key() < "h" { 135 return true 136 } 137 if !stopped { 138 stopped = true 139 return false 140 } 141 t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key()) 142 return false 143 }) 144 if err != nil { 145 t.Fatalf("Partial ReadRows: %v", err) 146 } 147} 148 149func TestIntegration_ReadRowList(t *testing.T) { 150 ctx := context.Background() 151 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 152 if err != nil { 153 t.Fatal(err) 154 } 155 defer cleanup() 156 157 if err := populatePresidentsGraph(table); err != nil { 158 t.Fatal(err) 159 } 160 161 // Read a RowList 162 var elt []string 163 keys := RowList{"wmckinley", "gwashington", "jadams"} 164 want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1" 165 err = table.ReadRows(ctx, keys, func(r Row) bool { 166 for _, ris := range r { 167 for _, ri := range ris { 168 elt = append(elt, formatReadItem(ri)) 169 } 170 } 171 return true 172 }) 173 if err != nil { 174 t.Fatalf("read RowList: %v", err) 175 } 176 177 if got := strings.Join(elt, ","); got != want { 178 t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) 179 } 180} 181 182func TestIntegration_DeleteRow(t *testing.T) { 183 ctx := context.Background() 184 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 185 if err != nil { 186 t.Fatal(err) 187 } 188 defer cleanup() 189 190 if err := populatePresidentsGraph(table); err != nil { 191 t.Fatal(err) 192 } 193 194 // Delete a row and check it goes away. 195 mut := NewMutation() 196 mut.DeleteRow() 197 if err := table.Apply(ctx, "wmckinley", mut); err != nil { 198 t.Fatalf("Apply DeleteRow: %v", err) 199 } 200 row, err := table.ReadRow(ctx, "wmckinley") 201 if err != nil { 202 t.Fatalf("Reading a row after DeleteRow: %v", err) 203 } 204 if len(row) != 0 { 205 t.Fatalf("Read non-zero row after DeleteRow: %v", row) 206 } 207} 208 209func TestIntegration_ReadModifyWrite(t *testing.T) { 210 ctx := context.Background() 211 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 212 if err != nil { 213 t.Fatal(err) 214 } 215 defer cleanup() 216 217 if err := populatePresidentsGraph(table); err != nil { 218 t.Fatal(err) 219 } 220 221 if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil { 222 t.Fatalf("Creating column family: %v", err) 223 } 224 225 appendRMW := func(b []byte) *ReadModifyWrite { 226 rmw := NewReadModifyWrite() 227 rmw.AppendValue("counter", "likes", b) 228 return rmw 229 } 230 incRMW := func(n int64) *ReadModifyWrite { 231 rmw := NewReadModifyWrite() 232 rmw.Increment("counter", "likes", n) 233 return rmw 234 } 235 rmwSeq := []struct { 236 desc string 237 rmw *ReadModifyWrite 238 want []byte 239 }{ 240 { 241 desc: "append #1", 242 rmw: appendRMW([]byte{0, 0, 0}), 243 want: []byte{0, 0, 0}, 244 }, 245 { 246 desc: "append #2", 247 rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17 248 want: []byte{0, 0, 0, 0, 0, 0, 0, 17}, 249 }, 250 { 251 desc: "increment", 252 rmw: incRMW(8), 253 want: []byte{0, 0, 0, 0, 0, 0, 0, 25}, 254 }, 255 } 256 for _, step := range rmwSeq { 257 row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw) 258 if err != nil { 259 t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) 260 } 261 // Make sure the modified cell returned by the RMW operation has a timestamp. 262 if row["counter"][0].Timestamp == 0 { 263 t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp) 264 } 265 clearTimestamps(row) 266 wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}} 267 if !testutil.Equal(row, wantRow) { 268 t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow) 269 } 270 } 271 272 // Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator. 273 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0})) 274 if err != nil { 275 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 276 } 277 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0})) 278 if err != nil { 279 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 280 } 281 // Get only the correct row back on read. 282 r, err := table.ReadRow(ctx, "issue-723-1") 283 if err != nil { 284 t.Fatalf("Reading row: %v", err) 285 } 286 if r.Key() != "issue-723-1" { 287 t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1") 288 } 289} 290 291func TestIntegration_ArbitraryTimestamps(t *testing.T) { 292 ctx := context.Background() 293 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 294 if err != nil { 295 t.Fatal(err) 296 } 297 defer cleanup() 298 299 // Test arbitrary timestamps more thoroughly. 300 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 301 t.Fatalf("Creating column family: %v", err) 302 } 303 const numVersions = 4 304 mut := NewMutation() 305 for i := 1; i < numVersions; i++ { 306 // Timestamps are used in thousands because the server 307 // only permits that granularity. 308 mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 309 mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 310 } 311 if err := table.Apply(ctx, "testrow", mut); err != nil { 312 t.Fatalf("Mutating row: %v", err) 313 } 314 r, err := table.ReadRow(ctx, "testrow") 315 if err != nil { 316 t.Fatalf("Reading row: %v", err) 317 } 318 wantRow := Row{"ts": []ReadItem{ 319 // These should be returned in descending timestamp order. 320 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 321 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 322 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 323 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 324 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 325 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 326 }} 327 if !testutil.Equal(r, wantRow) { 328 t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow) 329 } 330 331 // Do the same read, but filter to the latest two versions. 332 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 333 if err != nil { 334 t.Fatalf("Reading row: %v", err) 335 } 336 wantRow = Row{"ts": []ReadItem{ 337 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 338 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 339 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 340 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 341 }} 342 if !testutil.Equal(r, wantRow) { 343 t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow) 344 } 345 // Check cell offset / limit 346 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3))) 347 if err != nil { 348 t.Fatalf("Reading row: %v", err) 349 } 350 wantRow = Row{"ts": []ReadItem{ 351 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 352 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 353 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 354 }} 355 if !testutil.Equal(r, wantRow) { 356 t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow) 357 } 358 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3))) 359 if err != nil { 360 t.Fatalf("Reading row: %v", err) 361 } 362 wantRow = Row{"ts": []ReadItem{ 363 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 364 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 365 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 366 }} 367 if !testutil.Equal(r, wantRow) { 368 t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow) 369 } 370 // Check timestamp range filtering (with truncation) 371 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000))) 372 if err != nil { 373 t.Fatalf("Reading row: %v", err) 374 } 375 wantRow = Row{"ts": []ReadItem{ 376 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 377 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 378 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 379 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 380 }} 381 if !testutil.Equal(r, wantRow) { 382 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow) 383 } 384 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0))) 385 if err != nil { 386 t.Fatalf("Reading row: %v", err) 387 } 388 wantRow = Row{"ts": []ReadItem{ 389 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 390 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 391 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 392 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 393 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 394 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 395 }} 396 if !testutil.Equal(r, wantRow) { 397 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow) 398 } 399 // Delete non-existing cells, no such column family in this row 400 // Should not delete anything 401 if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil { 402 t.Fatalf("Creating column family: %v", err) 403 } 404 mut = NewMutation() 405 mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval 406 if err := table.Apply(ctx, "testrow", mut); err != nil { 407 t.Fatalf("Mutating row: %v", err) 408 } 409 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 410 if err != nil { 411 t.Fatalf("Reading row: %v", err) 412 } 413 if !testutil.Equal(r, wantRow) { 414 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 415 } 416 // Delete non-existing cells, no such column in this column family 417 // Should not delete anything 418 mut = NewMutation() 419 mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval 420 if err := table.Apply(ctx, "testrow", mut); err != nil { 421 t.Fatalf("Mutating row: %v", err) 422 } 423 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 424 if err != nil { 425 t.Fatalf("Reading row: %v", err) 426 } 427 if !testutil.Equal(r, wantRow) { 428 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 429 } 430 // Delete the cell with timestamp 2000 and repeat the last read, 431 // checking that we get ts 3000 and ts 1000. 432 mut = NewMutation() 433 mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval 434 if err := table.Apply(ctx, "testrow", mut); err != nil { 435 t.Fatalf("Mutating row: %v", err) 436 } 437 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 438 if err != nil { 439 t.Fatalf("Reading row: %v", err) 440 } 441 wantRow = Row{"ts": []ReadItem{ 442 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 443 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 444 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 445 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 446 }} 447 if !testutil.Equal(r, wantRow) { 448 t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow) 449 } 450 451 // Check DeleteCellsInFamily 452 if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil { 453 t.Fatalf("Creating column family: %v", err) 454 } 455 456 mut = NewMutation() 457 mut.Set("status", "start", 2000, []byte("2")) 458 mut.Set("status", "end", 3000, []byte("3")) 459 mut.Set("ts", "col", 1000, []byte("1")) 460 if err := table.Apply(ctx, "row1", mut); err != nil { 461 t.Fatalf("Mutating row: %v", err) 462 } 463 if err := table.Apply(ctx, "row2", mut); err != nil { 464 t.Fatalf("Mutating row: %v", err) 465 } 466 467 mut = NewMutation() 468 mut.DeleteCellsInFamily("status") 469 if err := table.Apply(ctx, "row1", mut); err != nil { 470 t.Fatalf("Delete cf: %v", err) 471 } 472 473 // ColumnFamily removed 474 r, err = table.ReadRow(ctx, "row1") 475 if err != nil { 476 t.Fatalf("Reading row: %v", err) 477 } 478 wantRow = Row{"ts": []ReadItem{ 479 {Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 480 }} 481 if !testutil.Equal(r, wantRow) { 482 t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow) 483 } 484 485 // ColumnFamily not removed 486 r, err = table.ReadRow(ctx, "row2") 487 if err != nil { 488 t.Fatalf("Reading row: %v", err) 489 } 490 wantRow = Row{ 491 "ts": []ReadItem{ 492 {Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 493 }, 494 "status": []ReadItem{ 495 {Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")}, 496 {Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 497 }, 498 } 499 if !testutil.Equal(r, wantRow) { 500 t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow) 501 } 502 503 // Check DeleteCellsInColumn 504 mut = NewMutation() 505 mut.Set("status", "start", 2000, []byte("2")) 506 mut.Set("status", "middle", 3000, []byte("3")) 507 mut.Set("status", "end", 1000, []byte("1")) 508 if err := table.Apply(ctx, "row3", mut); err != nil { 509 t.Fatalf("Mutating row: %v", err) 510 } 511 mut = NewMutation() 512 mut.DeleteCellsInColumn("status", "middle") 513 if err := table.Apply(ctx, "row3", mut); err != nil { 514 t.Fatalf("Delete column: %v", err) 515 } 516 r, err = table.ReadRow(ctx, "row3") 517 if err != nil { 518 t.Fatalf("Reading row: %v", err) 519 } 520 wantRow = Row{ 521 "status": []ReadItem{ 522 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 523 {Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 524 }, 525 } 526 if !testutil.Equal(r, wantRow) { 527 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 528 } 529 mut = NewMutation() 530 mut.DeleteCellsInColumn("status", "start") 531 if err := table.Apply(ctx, "row3", mut); err != nil { 532 t.Fatalf("Delete column: %v", err) 533 } 534 r, err = table.ReadRow(ctx, "row3") 535 if err != nil { 536 t.Fatalf("Reading row: %v", err) 537 } 538 wantRow = Row{ 539 "status": []ReadItem{ 540 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 541 }, 542 } 543 if !testutil.Equal(r, wantRow) { 544 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 545 } 546 mut = NewMutation() 547 mut.DeleteCellsInColumn("status", "end") 548 if err := table.Apply(ctx, "row3", mut); err != nil { 549 t.Fatalf("Delete column: %v", err) 550 } 551 r, err = table.ReadRow(ctx, "row3") 552 if err != nil { 553 t.Fatalf("Reading row: %v", err) 554 } 555 if len(r) != 0 { 556 t.Fatalf("Delete column: got %v, want empty row", r) 557 } 558 // Add same cell after delete 559 mut = NewMutation() 560 mut.Set("status", "end", 1000, []byte("1")) 561 if err := table.Apply(ctx, "row3", mut); err != nil { 562 t.Fatalf("Mutating row: %v", err) 563 } 564 r, err = table.ReadRow(ctx, "row3") 565 if err != nil { 566 t.Fatalf("Reading row: %v", err) 567 } 568 if !testutil.Equal(r, wantRow) { 569 t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow) 570 } 571} 572 573func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { 574 ctx := context.Background() 575 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 576 if err != nil { 577 t.Fatal(err) 578 } 579 defer cleanup() 580 581 if err := populatePresidentsGraph(table); err != nil { 582 t.Fatal(err) 583 } 584 585 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 586 t.Fatalf("Creating column family: %v", err) 587 } 588 589 // Do highly concurrent reads/writes. 590 const maxConcurrency = 1000 591 var wg sync.WaitGroup 592 for i := 0; i < maxConcurrency; i++ { 593 wg.Add(1) 594 go func() { 595 defer wg.Done() 596 switch r := rand.Intn(100); { // r ∈ [0,100) 597 case 0 <= r && r < 30: 598 // Do a read. 599 _, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1))) 600 if err != nil { 601 t.Errorf("Concurrent read: %v", err) 602 } 603 case 30 <= r && r < 100: 604 // Do a write. 605 mut := NewMutation() 606 mut.Set("ts", "col", 1000, []byte("data")) 607 if err := table.Apply(ctx, "testrow", mut); err != nil { 608 t.Errorf("Concurrent write: %v", err) 609 } 610 } 611 }() 612 } 613 wg.Wait() 614} 615 616func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { 617 ctx := context.Background() 618 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 619 if err != nil { 620 t.Fatal(err) 621 } 622 defer cleanup() 623 624 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 625 t.Fatalf("Creating column family: %v", err) 626 } 627 628 bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set. 629 nonsense := []byte("lorem ipsum dolor sit amet, ") 630 fill(bigBytes, nonsense) 631 mut := NewMutation() 632 mut.Set("ts", "col", 1000, bigBytes) 633 if err := table.Apply(ctx, "bigrow", mut); err != nil { 634 t.Fatalf("Big write: %v", err) 635 } 636 r, err := table.ReadRow(ctx, "bigrow") 637 if err != nil { 638 t.Fatalf("Big read: %v", err) 639 } 640 wantRow := Row{"ts": []ReadItem{ 641 {Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes}, 642 }} 643 if !testutil.Equal(r, wantRow) { 644 t.Fatalf("Big read returned incorrect bytes: %v", r) 645 } 646 647 var wg sync.WaitGroup 648 // Now write 1000 rows, each with 82 KB values, then scan them all. 649 medBytes := make([]byte, 82<<10) 650 fill(medBytes, nonsense) 651 sem := make(chan int, 50) // do up to 50 mutations at a time. 652 for i := 0; i < 1000; i++ { 653 mut := NewMutation() 654 mut.Set("ts", "big-scan", 1000, medBytes) 655 row := fmt.Sprintf("row-%d", i) 656 wg.Add(1) 657 go func() { 658 defer wg.Done() 659 defer func() { <-sem }() 660 sem <- 1 661 if err := table.Apply(ctx, row, mut); err != nil { 662 t.Errorf("Preparing large scan: %v", err) 663 } 664 }() 665 } 666 wg.Wait() 667 n := 0 668 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 669 for _, ris := range r { 670 for _, ri := range ris { 671 n += len(ri.Value) 672 } 673 } 674 return true 675 }, RowFilter(ColumnFilter("big-scan"))) 676 if err != nil { 677 t.Fatalf("Doing large scan: %v", err) 678 } 679 if want := 1000 * len(medBytes); n != want { 680 t.Fatalf("Large scan returned %d bytes, want %d", n, want) 681 } 682 // Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption. 683 rc := 0 684 wantRc := 3 685 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 686 rc++ 687 return true 688 }, LimitRows(int64(wantRc))) 689 if err != nil { 690 t.Fatal(err) 691 } 692 if rc != wantRc { 693 t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) 694 } 695 696 // Test bulk mutations 697 if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil { 698 t.Fatalf("Creating column family: %v", err) 699 } 700 bulkData := map[string][]string{ 701 "red sox": {"2004", "2007", "2013"}, 702 "patriots": {"2001", "2003", "2004", "2014"}, 703 "celtics": {"1981", "1984", "1986", "2008"}, 704 } 705 var rowKeys []string 706 var muts []*Mutation 707 for row, ss := range bulkData { 708 mut := NewMutation() 709 for _, name := range ss { 710 mut.Set("bulk", name, 1000, []byte("1")) 711 } 712 rowKeys = append(rowKeys, row) 713 muts = append(muts, mut) 714 } 715 status, err := table.ApplyBulk(ctx, rowKeys, muts) 716 if err != nil { 717 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 718 } 719 if status != nil { 720 t.Fatalf("non-nil errors: %v", err) 721 } 722 723 // Read each row back 724 for rowKey, ss := range bulkData { 725 row, err := table.ReadRow(ctx, rowKey) 726 if err != nil { 727 t.Fatalf("Reading a bulk row: %v", err) 728 } 729 var wantItems []ReadItem 730 for _, val := range ss { 731 wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")}) 732 } 733 wantRow := Row{"bulk": wantItems} 734 if !testutil.Equal(row, wantRow) { 735 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 736 } 737 } 738 739 // Test bulk write errors. 740 // Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error. 741 badMut := NewMutation() 742 badMut.Set("badfamily", "col", ServerTime, nil) 743 badMut2 := NewMutation() 744 badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1")) 745 status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2}) 746 if err != nil { 747 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 748 } 749 if status == nil { 750 t.Fatalf("No errors for bad bulk mutation") 751 } else if status[0] == nil || status[1] == nil { 752 t.Fatalf("No error for bad bulk mutation") 753 } 754} 755 756func TestIntegration_Read(t *testing.T) { 757 ctx := context.Background() 758 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 759 if err != nil { 760 t.Fatal(err) 761 } 762 defer cleanup() 763 764 // Insert some data. 765 initialData := map[string][]string{ 766 "wmckinley": {"tjefferson"}, 767 "gwashington": {"jadams"}, 768 "tjefferson": {"gwashington", "jadams", "wmckinley"}, 769 "jadams": {"gwashington", "tjefferson"}, 770 } 771 for row, ss := range initialData { 772 mut := NewMutation() 773 for _, name := range ss { 774 mut.Set("follows", name, 1000, []byte("1")) 775 } 776 if err := table.Apply(ctx, row, mut); err != nil { 777 t.Fatalf("Mutating row %q: %v", row, err) 778 } 779 } 780 781 for _, test := range []struct { 782 desc string 783 rr RowSet 784 filter Filter // may be nil 785 limit ReadOption // may be nil 786 787 // We do the read, grab all the cells, turn them into "<row>-<col>-<val>", 788 // and join with a comma. 789 want string 790 }{ 791 { 792 desc: "read all, unfiltered", 793 rr: RowRange{}, 794 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 795 }, 796 { 797 desc: "read with InfiniteRange, unfiltered", 798 rr: InfiniteRange("tjefferson"), 799 want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 800 }, 801 { 802 desc: "read with NewRange, unfiltered", 803 rr: NewRange("gargamel", "hubbard"), 804 want: "gwashington-jadams-1", 805 }, 806 { 807 desc: "read with PrefixRange, unfiltered", 808 rr: PrefixRange("jad"), 809 want: "jadams-gwashington-1,jadams-tjefferson-1", 810 }, 811 { 812 desc: "read with SingleRow, unfiltered", 813 rr: SingleRow("wmckinley"), 814 want: "wmckinley-tjefferson-1", 815 }, 816 { 817 desc: "read all, with ColumnFilter", 818 rr: RowRange{}, 819 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 820 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 821 }, 822 { 823 desc: "read all, with ColumnFilter, prefix", 824 rr: RowRange{}, 825 filter: ColumnFilter("j"), // no matches 826 want: "", 827 }, 828 { 829 desc: "read range, with ColumnRangeFilter", 830 rr: RowRange{}, 831 filter: ColumnRangeFilter("follows", "h", "k"), 832 want: "gwashington-jadams-1,tjefferson-jadams-1", 833 }, 834 { 835 desc: "read range from empty, with ColumnRangeFilter", 836 rr: RowRange{}, 837 filter: ColumnRangeFilter("follows", "", "u"), 838 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 839 }, 840 { 841 desc: "read range from start to empty, with ColumnRangeFilter", 842 rr: RowRange{}, 843 filter: ColumnRangeFilter("follows", "h", ""), 844 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 845 }, 846 { 847 desc: "read with RowKeyFilter", 848 rr: RowRange{}, 849 filter: RowKeyFilter(".*wash.*"), 850 want: "gwashington-jadams-1", 851 }, 852 { 853 desc: "read with RowKeyFilter, prefix", 854 rr: RowRange{}, 855 filter: RowKeyFilter("gwash"), 856 want: "", 857 }, 858 { 859 desc: "read with RowKeyFilter, no matches", 860 rr: RowRange{}, 861 filter: RowKeyFilter(".*xxx.*"), 862 want: "", 863 }, 864 { 865 desc: "read with FamilyFilter, no matches", 866 rr: RowRange{}, 867 filter: FamilyFilter(".*xxx.*"), 868 want: "", 869 }, 870 { 871 desc: "read with ColumnFilter + row limit", 872 rr: RowRange{}, 873 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 874 limit: LimitRows(2), 875 want: "gwashington-jadams-1,jadams-tjefferson-1", 876 }, 877 { 878 desc: "read all, strip values", 879 rr: RowRange{}, 880 filter: StripValueFilter(), 881 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 882 }, 883 { 884 desc: "read with ColumnFilter + row limit + strip values", 885 rr: RowRange{}, 886 filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson" 887 limit: LimitRows(2), 888 want: "gwashington-jadams-,jadams-tjefferson-", 889 }, 890 { 891 desc: "read with condition, strip values on true", 892 rr: RowRange{}, 893 filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil), 894 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 895 }, 896 { 897 desc: "read with condition, strip values on false", 898 rr: RowRange{}, 899 filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()), 900 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 901 }, 902 { 903 desc: "read with ValueRangeFilter + row limit", 904 rr: RowRange{}, 905 filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1" 906 limit: LimitRows(2), 907 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1", 908 }, 909 { 910 desc: "read with ValueRangeFilter, no match on exclusive end", 911 rr: RowRange{}, 912 filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match 913 want: "", 914 }, 915 { 916 desc: "read with ValueRangeFilter, no matches", 917 rr: RowRange{}, 918 filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing 919 want: "", 920 }, 921 { 922 desc: "read with InterleaveFilter, no matches on all filters", 923 rr: RowRange{}, 924 filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")), 925 want: "", 926 }, 927 { 928 desc: "read with InterleaveFilter, no duplicate cells", 929 rr: RowRange{}, 930 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")), 931 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 932 }, 933 { 934 desc: "read with InterleaveFilter, with duplicate cells", 935 rr: RowRange{}, 936 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")), 937 want: "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1", 938 }, 939 { 940 desc: "read with a RowRangeList and no filter", 941 rr: RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")}, 942 want: "gwashington-jadams-1,wmckinley-tjefferson-1", 943 }, 944 { 945 desc: "chain that excludes rows and matches nothing, in a condition", 946 rr: RowRange{}, 947 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil), 948 want: "", 949 }, 950 { 951 desc: "chain that ends with an interleave that has no match. covers #804", 952 rr: RowRange{}, 953 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil), 954 want: "", 955 }, 956 } { 957 t.Run(test.desc, func(t *testing.T) { 958 var opts []ReadOption 959 if test.filter != nil { 960 opts = append(opts, RowFilter(test.filter)) 961 } 962 if test.limit != nil { 963 opts = append(opts, test.limit) 964 } 965 var elt []string 966 err := table.ReadRows(ctx, test.rr, func(r Row) bool { 967 for _, ris := range r { 968 for _, ri := range ris { 969 elt = append(elt, formatReadItem(ri)) 970 } 971 } 972 return true 973 }, opts...) 974 if err != nil { 975 t.Fatal(err) 976 } 977 if got := strings.Join(elt, ","); got != test.want { 978 t.Fatalf("got %q\nwant %q", got, test.want) 979 } 980 }) 981 } 982} 983 984func TestIntegration_SampleRowKeys(t *testing.T) { 985 ctx := context.Background() 986 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 987 if err != nil { 988 t.Fatal(err) 989 } 990 defer cleanup() 991 992 // Insert some data. 993 initialData := map[string][]string{ 994 "wmckinley11": {"tjefferson11"}, 995 "gwashington77": {"jadams77"}, 996 "tjefferson0": {"gwashington0", "jadams0"}, 997 } 998 999 for row, ss := range initialData { 1000 mut := NewMutation() 1001 for _, name := range ss { 1002 mut.Set("follows", name, 1000, []byte("1")) 1003 } 1004 if err := table.Apply(ctx, row, mut); err != nil { 1005 t.Fatalf("Mutating row %q: %v", row, err) 1006 } 1007 } 1008 sampleKeys, err := table.SampleRowKeys(context.Background()) 1009 if err != nil { 1010 t.Fatalf("%s: %v", "SampleRowKeys:", err) 1011 } 1012 if len(sampleKeys) == 0 { 1013 t.Error("SampleRowKeys length 0") 1014 } 1015} 1016 1017func TestIntegration_Admin(t *testing.T) { 1018 testEnv, err := NewIntegrationEnv() 1019 if err != nil { 1020 t.Fatalf("IntegrationEnv: %v", err) 1021 } 1022 defer testEnv.Close() 1023 1024 timeout := 2 * time.Second 1025 if testEnv.Config().UseProd { 1026 timeout = 5 * time.Minute 1027 } 1028 ctx, _ := context.WithTimeout(context.Background(), timeout) 1029 1030 adminClient, err := testEnv.NewAdminClient() 1031 if err != nil { 1032 t.Fatalf("NewAdminClient: %v", err) 1033 } 1034 defer adminClient.Close() 1035 1036 iAdminClient, err := testEnv.NewInstanceAdminClient() 1037 if err != nil { 1038 t.Fatalf("NewInstanceAdminClient: %v", err) 1039 } 1040 if iAdminClient != nil { 1041 defer iAdminClient.Close() 1042 1043 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1044 if err != nil { 1045 t.Errorf("InstanceInfo: %v", err) 1046 } 1047 if iInfo.Name != adminClient.instance { 1048 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1049 } 1050 } 1051 1052 list := func() []string { 1053 tbls, err := adminClient.Tables(ctx) 1054 if err != nil { 1055 t.Fatalf("Fetching list of tables: %v", err) 1056 } 1057 sort.Strings(tbls) 1058 return tbls 1059 } 1060 containsAll := func(got, want []string) bool { 1061 gotSet := make(map[string]bool) 1062 1063 for _, s := range got { 1064 gotSet[s] = true 1065 } 1066 for _, s := range want { 1067 if !gotSet[s] { 1068 return false 1069 } 1070 } 1071 return true 1072 } 1073 1074 defer adminClient.DeleteTable(ctx, "mytable") 1075 1076 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1077 t.Fatalf("Creating table: %v", err) 1078 } 1079 1080 defer adminClient.DeleteTable(ctx, "myothertable") 1081 1082 if err := adminClient.CreateTable(ctx, "myothertable"); err != nil { 1083 t.Fatalf("Creating table: %v", err) 1084 } 1085 1086 if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) { 1087 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1088 } 1089 1090 must(adminClient.WaitForReplication(ctx, "mytable")) 1091 1092 if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil { 1093 t.Fatalf("Deleting table: %v", err) 1094 } 1095 tables := list() 1096 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1097 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1098 } 1099 if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) { 1100 t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted) 1101 } 1102 1103 tblConf := TableConf{ 1104 TableID: "conftable", 1105 Families: map[string]GCPolicy{ 1106 "fam1": MaxVersionsPolicy(1), 1107 "fam2": MaxVersionsPolicy(2), 1108 }, 1109 } 1110 if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil { 1111 t.Fatalf("Creating table from TableConf: %v", err) 1112 } 1113 defer adminClient.DeleteTable(ctx, tblConf.TableID) 1114 1115 tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID) 1116 if err != nil { 1117 t.Fatalf("Getting table info: %v", err) 1118 } 1119 sort.Strings(tblInfo.Families) 1120 wantFams := []string{"fam1", "fam2"} 1121 if !testutil.Equal(tblInfo.Families, wantFams) { 1122 t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams) 1123 } 1124 1125 // Populate mytable and drop row ranges 1126 if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil { 1127 t.Fatalf("Creating column family: %v", err) 1128 } 1129 1130 client, err := testEnv.NewClient() 1131 if err != nil { 1132 t.Fatalf("NewClient: %v", err) 1133 } 1134 defer client.Close() 1135 1136 tbl := client.Open("mytable") 1137 1138 prefixes := []string{"a", "b", "c"} 1139 for _, prefix := range prefixes { 1140 for i := 0; i < 5; i++ { 1141 mut := NewMutation() 1142 mut.Set("cf", "col", 1000, []byte("1")) 1143 if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil { 1144 t.Fatalf("Mutating row: %v", err) 1145 } 1146 } 1147 } 1148 1149 if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil { 1150 t.Errorf("DropRowRange a: %v", err) 1151 } 1152 if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil { 1153 t.Errorf("DropRowRange c: %v", err) 1154 } 1155 if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil { 1156 t.Errorf("DropRowRange x: %v", err) 1157 } 1158 1159 var gotRowCount int 1160 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1161 gotRowCount++ 1162 if !strings.HasPrefix(row.Key(), "b") { 1163 t.Errorf("Invalid row after dropping range: %v", row) 1164 } 1165 return true 1166 })) 1167 if gotRowCount != 5 { 1168 t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5) 1169 } 1170 1171 if err = adminClient.DropAllRows(ctx, "mytable"); err != nil { 1172 t.Errorf("DropAllRows mytable: %v", err) 1173 } 1174 1175 gotRowCount = 0 1176 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1177 gotRowCount++ 1178 return true 1179 })) 1180 if gotRowCount != 0 { 1181 t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0) 1182 } 1183} 1184 1185func TestIntegration_TableIam(t *testing.T) { 1186 testEnv, err := NewIntegrationEnv() 1187 if err != nil { 1188 t.Fatalf("IntegrationEnv: %v", err) 1189 } 1190 defer testEnv.Close() 1191 1192 if !testEnv.Config().UseProd { 1193 t.Skip("emulator doesn't support IAM Policy creation") 1194 } 1195 1196 timeout := 5 * time.Minute 1197 ctx, _ := context.WithTimeout(context.Background(), timeout) 1198 1199 adminClient, err := testEnv.NewAdminClient() 1200 if err != nil { 1201 t.Fatalf("NewAdminClient: %v", err) 1202 } 1203 defer adminClient.Close() 1204 1205 defer adminClient.DeleteTable(ctx, "mytable") 1206 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1207 t.Fatalf("Creating table: %v", err) 1208 } 1209 1210 // Verify that the IAM Controls work for Tables. 1211 iamHandle := adminClient.TableIAM("mytable") 1212 p, err := iamHandle.Policy(ctx) 1213 if err != nil { 1214 t.Errorf("Iam GetPolicy mytable: %v", err) 1215 } 1216 if err = iamHandle.SetPolicy(ctx, p); err != nil { 1217 t.Errorf("Iam SetPolicy mytable: %v", err) 1218 } 1219 if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil { 1220 t.Errorf("Iam TestPermissions mytable: %v", err) 1221 } 1222} 1223 1224func TestIntegration_AdminCreateInstance(t *testing.T) { 1225 if instanceToCreate == "" { 1226 t.Skip("instanceToCreate not set, skipping instance creation testing") 1227 } 1228 1229 testEnv, err := NewIntegrationEnv() 1230 if err != nil { 1231 t.Fatalf("IntegrationEnv: %v", err) 1232 } 1233 defer testEnv.Close() 1234 1235 if !testEnv.Config().UseProd { 1236 t.Skip("emulator doesn't support instance creation") 1237 } 1238 1239 timeout := 5 * time.Minute 1240 ctx, _ := context.WithTimeout(context.Background(), timeout) 1241 1242 iAdminClient, err := testEnv.NewInstanceAdminClient() 1243 if err != nil { 1244 t.Fatalf("NewInstanceAdminClient: %v", err) 1245 } 1246 defer iAdminClient.Close() 1247 1248 clusterID := instanceToCreate + "-cluster" 1249 1250 // Create a development instance 1251 conf := &InstanceConf{ 1252 InstanceId: instanceToCreate, 1253 ClusterId: clusterID, 1254 DisplayName: "test instance", 1255 Zone: instanceToCreateZone, 1256 InstanceType: DEVELOPMENT, 1257 } 1258 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1259 t.Fatalf("CreateInstance: %v", err) 1260 } 1261 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1262 1263 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1264 if err != nil { 1265 t.Fatalf("InstanceInfo: %v", err) 1266 } 1267 1268 // Basic return values are tested elsewhere, check instance type 1269 if iInfo.InstanceType != DEVELOPMENT { 1270 t.Fatalf("Instance is not DEVELOPMENT: %v", err) 1271 } 1272 1273 // Update everything we can about the instance in one call. 1274 confWithClusters := &InstanceWithClustersConfig{ 1275 InstanceID: instanceToCreate, 1276 DisplayName: "new display name", 1277 InstanceType: PRODUCTION, 1278 Clusters: []ClusterConfig{ 1279 {ClusterID: clusterID, NumNodes: 5, StorageType: HDD}}, 1280 } 1281 1282 if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { 1283 t.Fatalf("UpdateInstanceWithClusters: %v", err) 1284 } 1285 1286 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1287 if err != nil { 1288 t.Fatalf("InstanceInfo: %v", err) 1289 } 1290 1291 if iInfo.InstanceType != PRODUCTION { 1292 t.Fatalf("Instance type is not PRODUCTION: %v", err) 1293 } 1294 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1295 t.Fatalf("Display name: %q, want: %q", got, want) 1296 } 1297 1298 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1299 if err != nil { 1300 t.Fatalf("GetCluster: %v", err) 1301 } 1302 1303 if cInfo.ServeNodes != 5 { 1304 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1305 } 1306 1307 if cInfo.StorageType != HDD { 1308 t.Fatalf("StorageType: %v, want: %v", cInfo.StorageType, HDD) 1309 } 1310} 1311 1312func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) { 1313 if instanceToCreate == "" { 1314 t.Skip("instanceToCreate not set, skipping instance update testing") 1315 } 1316 1317 testEnv, err := NewIntegrationEnv() 1318 if err != nil { 1319 t.Fatalf("IntegrationEnv: %v", err) 1320 } 1321 defer testEnv.Close() 1322 1323 if !testEnv.Config().UseProd { 1324 t.Skip("emulator doesn't support instance creation") 1325 } 1326 1327 timeout := 5 * time.Minute 1328 ctx, _ := context.WithTimeout(context.Background(), timeout) 1329 1330 iAdminClient, err := testEnv.NewInstanceAdminClient() 1331 if err != nil { 1332 t.Fatalf("NewInstanceAdminClient: %v", err) 1333 } 1334 defer iAdminClient.Close() 1335 1336 clusterID := instanceToCreate + "-cluster" 1337 1338 // Create a development instance 1339 conf := &InstanceConf{ 1340 InstanceId: instanceToCreate, 1341 ClusterId: clusterID, 1342 DisplayName: "test instance", 1343 Zone: instanceToCreateZone, 1344 InstanceType: DEVELOPMENT, 1345 } 1346 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1347 t.Fatalf("CreateInstance: %v", err) 1348 } 1349 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1350 1351 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1352 if err != nil { 1353 t.Fatalf("InstanceInfo: %v", err) 1354 } 1355 1356 // Basic return values are tested elsewhere, check instance type 1357 if iInfo.InstanceType != DEVELOPMENT { 1358 t.Fatalf("Instance is not DEVELOPMENT: %v", err) 1359 } 1360 1361 // Update everything we can about the instance in one call. 1362 confWithClusters := &InstanceWithClustersConfig{ 1363 InstanceID: instanceToCreate, 1364 DisplayName: "new display name", 1365 InstanceType: PRODUCTION, 1366 Clusters: []ClusterConfig{ 1367 {ClusterID: clusterID, NumNodes: 5}}, 1368 } 1369 1370 results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1371 if err != nil { 1372 t.Fatalf("UpdateInstanceAndSyncClusters: %v", err) 1373 } 1374 1375 wantResults := UpdateInstanceResults{ 1376 InstanceUpdated: true, 1377 UpdatedClusters: []string{clusterID}, 1378 } 1379 if diff := testutil.Diff(*results, wantResults); diff != "" { 1380 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1381 } 1382 1383 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1384 if err != nil { 1385 t.Fatalf("InstanceInfo: %v", err) 1386 } 1387 1388 if iInfo.InstanceType != PRODUCTION { 1389 t.Fatalf("Instance type is not PRODUCTION: %v", err) 1390 } 1391 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1392 t.Fatalf("Display name: %q, want: %q", got, want) 1393 } 1394 1395 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1396 if err != nil { 1397 t.Fatalf("GetCluster: %v", err) 1398 } 1399 1400 if cInfo.ServeNodes != 5 { 1401 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1402 } 1403 1404 // Now add a second cluster as the only change. The first cluster must also be provided so 1405 // it is not removed. 1406 clusterID2 := clusterID + "-2" 1407 confWithClusters = &InstanceWithClustersConfig{ 1408 InstanceID: instanceToCreate, 1409 Clusters: []ClusterConfig{ 1410 {ClusterID: clusterID}, 1411 {ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2}}, 1412 } 1413 1414 results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1415 if err != nil { 1416 t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err) 1417 } 1418 1419 wantResults = UpdateInstanceResults{ 1420 InstanceUpdated: false, 1421 CreatedClusters: []string{clusterID2}, 1422 } 1423 if diff := testutil.Diff(*results, wantResults); diff != "" { 1424 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1425 } 1426 1427 // Now update one cluster and delete the other 1428 confWithClusters = &InstanceWithClustersConfig{ 1429 InstanceID: instanceToCreate, 1430 Clusters: []ClusterConfig{ 1431 {ClusterID: clusterID, NumNodes: 4}}, 1432 } 1433 1434 results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1435 if err != nil { 1436 t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err) 1437 } 1438 1439 wantResults = UpdateInstanceResults{ 1440 InstanceUpdated: false, 1441 UpdatedClusters: []string{clusterID}, 1442 DeletedClusters: []string{clusterID2}, 1443 } 1444 1445 if diff := testutil.Diff(*results, wantResults); diff != "" { 1446 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1447 } 1448 1449 // Make sure the instance looks as we would expect 1450 clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId) 1451 if err != nil { 1452 t.Fatalf("Clusters: %v", err) 1453 } 1454 1455 if len(clusters) != 1 { 1456 t.Fatalf("Clusters length %v, want: 1", len(clusters)) 1457 } 1458 1459 wantCluster := &ClusterInfo{ 1460 Name: clusterID, 1461 Zone: instanceToCreateZone, 1462 ServeNodes: 4, 1463 State: "READY", 1464 } 1465 if diff := testutil.Diff(clusters[0], wantCluster); diff != "" { 1466 t.Fatalf("InstanceEquality: got - want +\n%s", diff) 1467 } 1468} 1469 1470func TestIntegration_AdminSnapshot(t *testing.T) { 1471 testEnv, err := NewIntegrationEnv() 1472 if err != nil { 1473 t.Fatalf("IntegrationEnv: %v", err) 1474 } 1475 defer testEnv.Close() 1476 1477 if !testEnv.Config().UseProd { 1478 t.Skip("emulator doesn't support snapshots") 1479 } 1480 1481 timeout := 2 * time.Second 1482 if testEnv.Config().UseProd { 1483 timeout = 5 * time.Minute 1484 } 1485 ctx, _ := context.WithTimeout(context.Background(), timeout) 1486 1487 adminClient, err := testEnv.NewAdminClient() 1488 if err != nil { 1489 t.Fatalf("NewAdminClient: %v", err) 1490 } 1491 defer adminClient.Close() 1492 1493 table := testEnv.Config().Table 1494 cluster := testEnv.Config().Cluster 1495 1496 list := func(cluster string) ([]*SnapshotInfo, error) { 1497 infos := []*SnapshotInfo(nil) 1498 1499 it := adminClient.Snapshots(ctx, cluster) 1500 for { 1501 s, err := it.Next() 1502 if err == iterator.Done { 1503 break 1504 } 1505 if err != nil { 1506 return nil, err 1507 } 1508 infos = append(infos, s) 1509 } 1510 return infos, err 1511 } 1512 1513 // Delete the table at the end of the test. Schedule ahead of time 1514 // in case the client fails 1515 defer adminClient.DeleteTable(ctx, table) 1516 1517 if err := adminClient.CreateTable(ctx, table); err != nil { 1518 t.Fatalf("Creating table: %v", err) 1519 } 1520 1521 // Precondition: no snapshots 1522 snapshots, err := list(cluster) 1523 if err != nil { 1524 t.Fatalf("Initial snapshot list: %v", err) 1525 } 1526 if got, want := len(snapshots), 0; got != want { 1527 t.Fatalf("Initial snapshot list len: %d, want: %d", got, want) 1528 } 1529 1530 // Create snapshot 1531 defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot") 1532 1533 if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil { 1534 t.Fatalf("Creating snaphot: %v", err) 1535 } 1536 1537 // List snapshot 1538 snapshots, err = list(cluster) 1539 if err != nil { 1540 t.Fatalf("Listing snapshots: %v", err) 1541 } 1542 if got, want := len(snapshots), 1; got != want { 1543 t.Fatalf("Listing snapshot count: %d, want: %d", got, want) 1544 } 1545 if got, want := snapshots[0].Name, "mysnapshot"; got != want { 1546 t.Fatalf("Snapshot name: %s, want: %s", got, want) 1547 } 1548 if got, want := snapshots[0].SourceTable, table; got != want { 1549 t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want) 1550 } 1551 if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { 1552 t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want) 1553 } 1554 1555 // Get snapshot 1556 snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot") 1557 if err != nil { 1558 t.Fatalf("SnapshotInfo: %v", snapshot) 1559 } 1560 if got, want := *snapshot, *snapshots[0]; got != want { 1561 t.Fatalf("SnapshotInfo: %v, want: %v", got, want) 1562 } 1563 1564 // Restore 1565 restoredTable := table + "-restored" 1566 defer adminClient.DeleteTable(ctx, restoredTable) 1567 if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil { 1568 t.Fatalf("CreateTableFromSnapshot: %v", err) 1569 } 1570 if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { 1571 t.Fatalf("Restored TableInfo: %v", err) 1572 } 1573 1574 // Delete snapshot 1575 if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil { 1576 t.Fatalf("DeleteSnapshot: %v", err) 1577 } 1578 snapshots, err = list(cluster) 1579 if err != nil { 1580 t.Fatalf("List after Delete: %v", err) 1581 } 1582 if got, want := len(snapshots), 0; got != want { 1583 t.Fatalf("List after delete len: %d, want: %d", got, want) 1584 } 1585} 1586 1587func TestIntegration_Granularity(t *testing.T) { 1588 testEnv, err := NewIntegrationEnv() 1589 if err != nil { 1590 t.Fatalf("IntegrationEnv: %v", err) 1591 } 1592 defer testEnv.Close() 1593 1594 timeout := 2 * time.Second 1595 if testEnv.Config().UseProd { 1596 timeout = 5 * time.Minute 1597 } 1598 ctx, _ := context.WithTimeout(context.Background(), timeout) 1599 1600 adminClient, err := testEnv.NewAdminClient() 1601 if err != nil { 1602 t.Fatalf("NewAdminClient: %v", err) 1603 } 1604 defer adminClient.Close() 1605 1606 list := func() []string { 1607 tbls, err := adminClient.Tables(ctx) 1608 if err != nil { 1609 t.Fatalf("Fetching list of tables: %v", err) 1610 } 1611 sort.Strings(tbls) 1612 return tbls 1613 } 1614 containsAll := func(got, want []string) bool { 1615 gotSet := make(map[string]bool) 1616 1617 for _, s := range got { 1618 gotSet[s] = true 1619 } 1620 for _, s := range want { 1621 if !gotSet[s] { 1622 return false 1623 } 1624 } 1625 return true 1626 } 1627 1628 defer adminClient.DeleteTable(ctx, "mytable") 1629 1630 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1631 t.Fatalf("Creating table: %v", err) 1632 } 1633 1634 tables := list() 1635 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1636 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1637 } 1638 1639 // calling ModifyColumnFamilies to check the granularity of table 1640 prefix := adminClient.instancePrefix() 1641 req := &btapb.ModifyColumnFamiliesRequest{ 1642 Name: prefix + "/tables/" + "mytable", 1643 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 1644 Id: "cf", 1645 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}}, 1646 }}, 1647 } 1648 table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req) 1649 if err != nil { 1650 t.Fatalf("Creating column family: %v", err) 1651 } 1652 if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) { 1653 t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS)) 1654 } 1655} 1656 1657func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) { 1658 testEnv, err := NewIntegrationEnv() 1659 if err != nil { 1660 t.Fatalf("IntegrationEnv: %v", err) 1661 } 1662 defer testEnv.Close() 1663 1664 timeout := 2 * time.Second 1665 if testEnv.Config().UseProd { 1666 timeout = 5 * time.Minute 1667 } 1668 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1669 defer cancel() 1670 1671 adminClient, err := testEnv.NewAdminClient() 1672 if err != nil { 1673 t.Fatalf("NewAdminClient: %v", err) 1674 } 1675 defer adminClient.Close() 1676 1677 iAdminClient, err := testEnv.NewInstanceAdminClient() 1678 if err != nil { 1679 t.Fatalf("NewInstanceAdminClient: %v", err) 1680 } 1681 1682 if iAdminClient == nil { 1683 return 1684 } 1685 1686 defer iAdminClient.Close() 1687 profile := ProfileConf{ 1688 ProfileID: "app_profile1", 1689 InstanceID: adminClient.instance, 1690 ClusterID: testEnv.Config().Cluster, 1691 Description: "creating new app profile 1", 1692 RoutingPolicy: SingleClusterRouting, 1693 } 1694 1695 createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile) 1696 if err != nil { 1697 t.Fatalf("Creating app profile: %v", err) 1698 1699 } 1700 1701 gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 1702 1703 if err != nil { 1704 t.Fatalf("Get app profile: %v", err) 1705 } 1706 1707 if !proto.Equal(createdProfile, gotProfile) { 1708 t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name) 1709 1710 } 1711 1712 list := func(instanceID string) ([]*btapb.AppProfile, error) { 1713 profiles := []*btapb.AppProfile(nil) 1714 1715 it := iAdminClient.ListAppProfiles(ctx, instanceID) 1716 for { 1717 s, err := it.Next() 1718 if err == iterator.Done { 1719 break 1720 } 1721 if err != nil { 1722 return nil, err 1723 } 1724 profiles = append(profiles, s) 1725 } 1726 return profiles, err 1727 } 1728 1729 profiles, err := list(adminClient.instance) 1730 if err != nil { 1731 t.Fatalf("List app profile: %v", err) 1732 } 1733 1734 if got, want := len(profiles), 1; got != want { 1735 t.Fatalf("Initial app profile list len: %d, want: %d", got, want) 1736 } 1737 1738 for _, test := range []struct { 1739 desc string 1740 uattrs ProfileAttrsToUpdate 1741 want *btapb.AppProfile // nil means error 1742 }{ 1743 { 1744 desc: "empty update", 1745 uattrs: ProfileAttrsToUpdate{}, 1746 want: nil, 1747 }, 1748 1749 { 1750 desc: "empty description update", 1751 uattrs: ProfileAttrsToUpdate{Description: ""}, 1752 want: &btapb.AppProfile{ 1753 Name: gotProfile.Name, 1754 Description: "", 1755 RoutingPolicy: gotProfile.RoutingPolicy, 1756 Etag: gotProfile.Etag}, 1757 }, 1758 { 1759 desc: "routing update", 1760 uattrs: ProfileAttrsToUpdate{ 1761 RoutingPolicy: SingleClusterRouting, 1762 ClusterID: testEnv.Config().Cluster, 1763 }, 1764 want: &btapb.AppProfile{ 1765 Name: gotProfile.Name, 1766 Description: "", 1767 Etag: gotProfile.Etag, 1768 RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{ 1769 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1770 ClusterId: testEnv.Config().Cluster, 1771 }}, 1772 }, 1773 }, 1774 } { 1775 err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs) 1776 if err != nil { 1777 if test.want != nil { 1778 t.Errorf("%s: %v", test.desc, err) 1779 } 1780 continue 1781 } 1782 if err == nil && test.want == nil { 1783 t.Errorf("%s: got nil, want error", test.desc) 1784 continue 1785 } 1786 1787 got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 1788 1789 if !proto.Equal(got, test.want) { 1790 t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want) 1791 } 1792 1793 } 1794 1795 err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1") 1796 if err != nil { 1797 t.Fatalf("Delete app profile: %v", err) 1798 } 1799 1800} 1801 1802func TestIntegration_InstanceUpdate(t *testing.T) { 1803 testEnv, err := NewIntegrationEnv() 1804 if err != nil { 1805 t.Fatalf("IntegrationEnv: %v", err) 1806 } 1807 defer testEnv.Close() 1808 1809 timeout := 2 * time.Second 1810 if testEnv.Config().UseProd { 1811 timeout = 5 * time.Minute 1812 } 1813 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1814 defer cancel() 1815 1816 adminClient, err := testEnv.NewAdminClient() 1817 if err != nil { 1818 t.Fatalf("NewAdminClient: %v", err) 1819 } 1820 1821 defer adminClient.Close() 1822 1823 iAdminClient, err := testEnv.NewInstanceAdminClient() 1824 if err != nil { 1825 t.Fatalf("NewInstanceAdminClient: %v", err) 1826 } 1827 1828 if iAdminClient == nil { 1829 return 1830 } 1831 1832 defer iAdminClient.Close() 1833 1834 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1835 if err != nil { 1836 t.Errorf("InstanceInfo: %v", err) 1837 } 1838 1839 if iInfo.Name != adminClient.instance { 1840 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1841 } 1842 1843 if iInfo.DisplayName != adminClient.instance { 1844 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1845 } 1846 1847 const numNodes = 4 1848 // update cluster nodes 1849 if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil { 1850 t.Errorf("UpdateCluster: %v", err) 1851 } 1852 1853 // get cluster after updating 1854 cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster) 1855 if err != nil { 1856 t.Errorf("GetCluster %v", err) 1857 } 1858 if cis.ServeNodes != int(numNodes) { 1859 t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes)) 1860 } 1861} 1862 1863func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { 1864 testEnv, err := NewIntegrationEnv() 1865 if err != nil { 1866 return nil, nil, nil, "", nil, err 1867 } 1868 1869 var timeout time.Duration 1870 if testEnv.Config().UseProd { 1871 timeout = 10 * time.Minute 1872 t.Logf("Running test against production") 1873 } else { 1874 timeout = 5 * time.Minute 1875 t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint) 1876 } 1877 ctx, cancel := context.WithTimeout(ctx, timeout) 1878 1879 client, err := testEnv.NewClient() 1880 if err != nil { 1881 return nil, nil, nil, "", nil, err 1882 } 1883 1884 adminClient, err := testEnv.NewAdminClient() 1885 if err != nil { 1886 return nil, nil, nil, "", nil, err 1887 } 1888 1889 tableName = testEnv.Config().Table 1890 if err := adminClient.CreateTable(ctx, tableName); err != nil { 1891 cancel() 1892 return nil, nil, nil, "", nil, err 1893 } 1894 if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil { 1895 cancel() 1896 return nil, nil, nil, "", nil, err 1897 } 1898 1899 return client, adminClient, client.Open(tableName), tableName, func() { 1900 if err := adminClient.DeleteTable(ctx, tableName); err != nil { 1901 t.Errorf("DeleteTable got error %v", err) 1902 } 1903 cancel() 1904 client.Close() 1905 adminClient.Close() 1906 }, nil 1907} 1908 1909func formatReadItem(ri ReadItem) string { 1910 // Use the column qualifier only to make the test data briefer. 1911 col := ri.Column[strings.Index(ri.Column, ":")+1:] 1912 return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value) 1913} 1914 1915func fill(b, sub []byte) { 1916 for len(b) > len(sub) { 1917 n := copy(b, sub) 1918 b = b[n:] 1919 } 1920} 1921 1922func clearTimestamps(r Row) { 1923 for _, ris := range r { 1924 for i := range ris { 1925 ris[i].Timestamp = 0 1926 } 1927 } 1928} 1929