1/* 2Copyright 2019 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "context" 21 "flag" 22 "fmt" 23 "math" 24 "math/rand" 25 "sort" 26 "strings" 27 "sync" 28 "testing" 29 "time" 30 31 "cloud.google.com/go/internal/testutil" 32 "cloud.google.com/go/internal/uid" 33 "github.com/golang/protobuf/proto" 34 "google.golang.org/api/iterator" 35 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 36) 37 38var ( 39 presidentsSocialGraph = map[string][]string{ 40 "wmckinley": {"tjefferson"}, 41 "gwashington": {"jadams"}, 42 "tjefferson": {"gwashington", "jadams"}, 43 "jadams": {"gwashington", "tjefferson"}, 44 } 45 46 tableNameSpace = uid.NewSpace("cbt-test", &uid.Options{Short: true}) 47) 48 49func populatePresidentsGraph(table *Table) error { 50 ctx := context.Background() 51 for row, ss := range presidentsSocialGraph { 52 mut := NewMutation() 53 for _, name := range ss { 54 mut.Set("follows", name, 1000, []byte("1")) 55 } 56 if err := table.Apply(ctx, row, mut); err != nil { 57 return fmt.Errorf("Mutating row %q: %v", row, err) 58 } 59 } 60 return nil 61} 62 63var instanceToCreate string 64var instanceToCreateZone string 65var instanceToCreateZone2 string 66 67func init() { 68 // Don't test instance creation by default, as quota is necessary and aborted tests could strand resources. 69 flag.StringVar(&instanceToCreate, "it.instance-to-create", "", 70 "The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.") 71 flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b", 72 "The zone in which to create the new test instance.") 73 flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c", 74 "The zone in which to create a second cluster in the test instance.") 75} 76 77func TestIntegration_ConditionalMutations(t *testing.T) { 78 ctx := context.Background() 79 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 80 if err != nil { 81 t.Fatal(err) 82 } 83 defer cleanup() 84 85 if err := populatePresidentsGraph(table); err != nil { 86 t.Fatal(err) 87 } 88 89 // Do a conditional mutation with a complex filter. 90 mutTrue := NewMutation() 91 mutTrue.Set("follows", "wmckinley", 1000, []byte("1")) 92 filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter(".")) 93 mut := NewCondMutation(filter, mutTrue, nil) 94 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 95 t.Fatalf("Conditionally mutating row: %v", err) 96 } 97 // Do a second condition mutation with a filter that does not match, 98 // and thus no changes should be made. 99 mutTrue = NewMutation() 100 mutTrue.DeleteRow() 101 filter = ColumnFilter("snoop.dogg") 102 mut = NewCondMutation(filter, mutTrue, nil) 103 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 104 t.Fatalf("Conditionally mutating row: %v", err) 105 } 106 107 // Fetch a row. 108 row, err := table.ReadRow(ctx, "jadams") 109 if err != nil { 110 t.Fatalf("Reading a row: %v", err) 111 } 112 wantRow := Row{ 113 "follows": []ReadItem{ 114 {Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")}, 115 {Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")}, 116 }, 117 } 118 if !testutil.Equal(row, wantRow) { 119 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 120 } 121} 122 123func TestIntegration_PartialReadRows(t *testing.T) { 124 ctx := context.Background() 125 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 126 if err != nil { 127 t.Fatal(err) 128 } 129 defer cleanup() 130 131 if err := populatePresidentsGraph(table); err != nil { 132 t.Fatal(err) 133 } 134 135 // Do a scan and stop part way through. 136 // Verify that the ReadRows callback doesn't keep running. 137 stopped := false 138 err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { 139 if r.Key() < "h" { 140 return true 141 } 142 if !stopped { 143 stopped = true 144 return false 145 } 146 t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key()) 147 return false 148 }) 149 if err != nil { 150 t.Fatalf("Partial ReadRows: %v", err) 151 } 152} 153 154func TestIntegration_ReadRowList(t *testing.T) { 155 ctx := context.Background() 156 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 157 if err != nil { 158 t.Fatal(err) 159 } 160 defer cleanup() 161 162 if err := populatePresidentsGraph(table); err != nil { 163 t.Fatal(err) 164 } 165 166 // Read a RowList 167 var elt []string 168 keys := RowList{"wmckinley", "gwashington", "jadams"} 169 want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1" 170 err = table.ReadRows(ctx, keys, func(r Row) bool { 171 for _, ris := range r { 172 for _, ri := range ris { 173 elt = append(elt, formatReadItem(ri)) 174 } 175 } 176 return true 177 }) 178 if err != nil { 179 t.Fatalf("read RowList: %v", err) 180 } 181 182 if got := strings.Join(elt, ","); got != want { 183 t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) 184 } 185} 186 187func TestIntegration_DeleteRow(t *testing.T) { 188 ctx := context.Background() 189 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 190 if err != nil { 191 t.Fatal(err) 192 } 193 defer cleanup() 194 195 if err := populatePresidentsGraph(table); err != nil { 196 t.Fatal(err) 197 } 198 199 // Delete a row and check it goes away. 200 mut := NewMutation() 201 mut.DeleteRow() 202 if err := table.Apply(ctx, "wmckinley", mut); err != nil { 203 t.Fatalf("Apply DeleteRow: %v", err) 204 } 205 row, err := table.ReadRow(ctx, "wmckinley") 206 if err != nil { 207 t.Fatalf("Reading a row after DeleteRow: %v", err) 208 } 209 if len(row) != 0 { 210 t.Fatalf("Read non-zero row after DeleteRow: %v", row) 211 } 212} 213 214func TestIntegration_ReadModifyWrite(t *testing.T) { 215 ctx := context.Background() 216 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 217 if err != nil { 218 t.Fatal(err) 219 } 220 defer cleanup() 221 222 if err := populatePresidentsGraph(table); err != nil { 223 t.Fatal(err) 224 } 225 226 if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil { 227 t.Fatalf("Creating column family: %v", err) 228 } 229 230 appendRMW := func(b []byte) *ReadModifyWrite { 231 rmw := NewReadModifyWrite() 232 rmw.AppendValue("counter", "likes", b) 233 return rmw 234 } 235 incRMW := func(n int64) *ReadModifyWrite { 236 rmw := NewReadModifyWrite() 237 rmw.Increment("counter", "likes", n) 238 return rmw 239 } 240 rmwSeq := []struct { 241 desc string 242 rmw *ReadModifyWrite 243 want []byte 244 }{ 245 { 246 desc: "append #1", 247 rmw: appendRMW([]byte{0, 0, 0}), 248 want: []byte{0, 0, 0}, 249 }, 250 { 251 desc: "append #2", 252 rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17 253 want: []byte{0, 0, 0, 0, 0, 0, 0, 17}, 254 }, 255 { 256 desc: "increment", 257 rmw: incRMW(8), 258 want: []byte{0, 0, 0, 0, 0, 0, 0, 25}, 259 }, 260 } 261 for _, step := range rmwSeq { 262 row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw) 263 if err != nil { 264 t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) 265 } 266 // Make sure the modified cell returned by the RMW operation has a timestamp. 267 if row["counter"][0].Timestamp == 0 { 268 t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp) 269 } 270 clearTimestamps(row) 271 wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}} 272 if !testutil.Equal(row, wantRow) { 273 t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow) 274 } 275 } 276 277 // Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator. 278 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0})) 279 if err != nil { 280 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 281 } 282 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0})) 283 if err != nil { 284 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 285 } 286 // Get only the correct row back on read. 287 r, err := table.ReadRow(ctx, "issue-723-1") 288 if err != nil { 289 t.Fatalf("Reading row: %v", err) 290 } 291 if r.Key() != "issue-723-1" { 292 t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1") 293 } 294} 295 296func TestIntegration_ArbitraryTimestamps(t *testing.T) { 297 ctx := context.Background() 298 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 299 if err != nil { 300 t.Fatal(err) 301 } 302 defer cleanup() 303 304 // Test arbitrary timestamps more thoroughly. 305 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 306 t.Fatalf("Creating column family: %v", err) 307 } 308 const numVersions = 4 309 mut := NewMutation() 310 for i := 1; i < numVersions; i++ { 311 // Timestamps are used in thousands because the server 312 // only permits that granularity. 313 mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 314 mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 315 } 316 if err := table.Apply(ctx, "testrow", mut); err != nil { 317 t.Fatalf("Mutating row: %v", err) 318 } 319 r, err := table.ReadRow(ctx, "testrow") 320 if err != nil { 321 t.Fatalf("Reading row: %v", err) 322 } 323 wantRow := Row{"ts": []ReadItem{ 324 // These should be returned in descending timestamp order. 325 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 326 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 327 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 328 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 329 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 330 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 331 }} 332 if !testutil.Equal(r, wantRow) { 333 t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow) 334 } 335 336 // Do the same read, but filter to the latest two versions. 337 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 338 if err != nil { 339 t.Fatalf("Reading row: %v", err) 340 } 341 wantRow = Row{"ts": []ReadItem{ 342 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 343 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 344 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 345 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 346 }} 347 if !testutil.Equal(r, wantRow) { 348 t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow) 349 } 350 // Check cell offset / limit 351 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3))) 352 if err != nil { 353 t.Fatalf("Reading row: %v", err) 354 } 355 wantRow = Row{"ts": []ReadItem{ 356 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 357 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 358 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 359 }} 360 if !testutil.Equal(r, wantRow) { 361 t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow) 362 } 363 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3))) 364 if err != nil { 365 t.Fatalf("Reading row: %v", err) 366 } 367 wantRow = Row{"ts": []ReadItem{ 368 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 369 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 370 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 371 }} 372 if !testutil.Equal(r, wantRow) { 373 t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow) 374 } 375 // Check timestamp range filtering (with truncation) 376 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000))) 377 if err != nil { 378 t.Fatalf("Reading row: %v", err) 379 } 380 wantRow = Row{"ts": []ReadItem{ 381 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 382 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 383 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 384 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 385 }} 386 if !testutil.Equal(r, wantRow) { 387 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow) 388 } 389 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0))) 390 if err != nil { 391 t.Fatalf("Reading row: %v", err) 392 } 393 wantRow = Row{"ts": []ReadItem{ 394 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 395 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 396 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 397 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 398 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 399 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 400 }} 401 if !testutil.Equal(r, wantRow) { 402 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow) 403 } 404 // Delete non-existing cells, no such column family in this row 405 // Should not delete anything 406 if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil { 407 t.Fatalf("Creating column family: %v", err) 408 } 409 mut = NewMutation() 410 mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval 411 if err := table.Apply(ctx, "testrow", mut); err != nil { 412 t.Fatalf("Mutating row: %v", err) 413 } 414 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 415 if err != nil { 416 t.Fatalf("Reading row: %v", err) 417 } 418 if !testutil.Equal(r, wantRow) { 419 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 420 } 421 // Delete non-existing cells, no such column in this column family 422 // Should not delete anything 423 mut = NewMutation() 424 mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval 425 if err := table.Apply(ctx, "testrow", mut); err != nil { 426 t.Fatalf("Mutating row: %v", err) 427 } 428 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 429 if err != nil { 430 t.Fatalf("Reading row: %v", err) 431 } 432 if !testutil.Equal(r, wantRow) { 433 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 434 } 435 // Delete the cell with timestamp 2000 and repeat the last read, 436 // checking that we get ts 3000 and ts 1000. 437 mut = NewMutation() 438 mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval 439 if err := table.Apply(ctx, "testrow", mut); err != nil { 440 t.Fatalf("Mutating row: %v", err) 441 } 442 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 443 if err != nil { 444 t.Fatalf("Reading row: %v", err) 445 } 446 wantRow = Row{"ts": []ReadItem{ 447 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 448 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 449 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 450 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 451 }} 452 if !testutil.Equal(r, wantRow) { 453 t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow) 454 } 455 456 // Check DeleteCellsInFamily 457 if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil { 458 t.Fatalf("Creating column family: %v", err) 459 } 460 461 mut = NewMutation() 462 mut.Set("status", "start", 2000, []byte("2")) 463 mut.Set("status", "end", 3000, []byte("3")) 464 mut.Set("ts", "col", 1000, []byte("1")) 465 if err := table.Apply(ctx, "row1", mut); err != nil { 466 t.Fatalf("Mutating row: %v", err) 467 } 468 if err := table.Apply(ctx, "row2", mut); err != nil { 469 t.Fatalf("Mutating row: %v", err) 470 } 471 472 mut = NewMutation() 473 mut.DeleteCellsInFamily("status") 474 if err := table.Apply(ctx, "row1", mut); err != nil { 475 t.Fatalf("Delete cf: %v", err) 476 } 477 478 // ColumnFamily removed 479 r, err = table.ReadRow(ctx, "row1") 480 if err != nil { 481 t.Fatalf("Reading row: %v", err) 482 } 483 wantRow = Row{"ts": []ReadItem{ 484 {Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 485 }} 486 if !testutil.Equal(r, wantRow) { 487 t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow) 488 } 489 490 // ColumnFamily not removed 491 r, err = table.ReadRow(ctx, "row2") 492 if err != nil { 493 t.Fatalf("Reading row: %v", err) 494 } 495 wantRow = Row{ 496 "ts": []ReadItem{ 497 {Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 498 }, 499 "status": []ReadItem{ 500 {Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")}, 501 {Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 502 }, 503 } 504 if !testutil.Equal(r, wantRow) { 505 t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow) 506 } 507 508 // Check DeleteCellsInColumn 509 mut = NewMutation() 510 mut.Set("status", "start", 2000, []byte("2")) 511 mut.Set("status", "middle", 3000, []byte("3")) 512 mut.Set("status", "end", 1000, []byte("1")) 513 if err := table.Apply(ctx, "row3", mut); err != nil { 514 t.Fatalf("Mutating row: %v", err) 515 } 516 mut = NewMutation() 517 mut.DeleteCellsInColumn("status", "middle") 518 if err := table.Apply(ctx, "row3", mut); err != nil { 519 t.Fatalf("Delete column: %v", err) 520 } 521 r, err = table.ReadRow(ctx, "row3") 522 if err != nil { 523 t.Fatalf("Reading row: %v", err) 524 } 525 wantRow = Row{ 526 "status": []ReadItem{ 527 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 528 {Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 529 }, 530 } 531 if !testutil.Equal(r, wantRow) { 532 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 533 } 534 mut = NewMutation() 535 mut.DeleteCellsInColumn("status", "start") 536 if err := table.Apply(ctx, "row3", mut); err != nil { 537 t.Fatalf("Delete column: %v", err) 538 } 539 r, err = table.ReadRow(ctx, "row3") 540 if err != nil { 541 t.Fatalf("Reading row: %v", err) 542 } 543 wantRow = Row{ 544 "status": []ReadItem{ 545 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 546 }, 547 } 548 if !testutil.Equal(r, wantRow) { 549 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 550 } 551 mut = NewMutation() 552 mut.DeleteCellsInColumn("status", "end") 553 if err := table.Apply(ctx, "row3", mut); err != nil { 554 t.Fatalf("Delete column: %v", err) 555 } 556 r, err = table.ReadRow(ctx, "row3") 557 if err != nil { 558 t.Fatalf("Reading row: %v", err) 559 } 560 if len(r) != 0 { 561 t.Fatalf("Delete column: got %v, want empty row", r) 562 } 563 // Add same cell after delete 564 mut = NewMutation() 565 mut.Set("status", "end", 1000, []byte("1")) 566 if err := table.Apply(ctx, "row3", mut); err != nil { 567 t.Fatalf("Mutating row: %v", err) 568 } 569 r, err = table.ReadRow(ctx, "row3") 570 if err != nil { 571 t.Fatalf("Reading row: %v", err) 572 } 573 if !testutil.Equal(r, wantRow) { 574 t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow) 575 } 576} 577 578func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { 579 ctx := context.Background() 580 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 581 if err != nil { 582 t.Fatal(err) 583 } 584 defer cleanup() 585 586 if err := populatePresidentsGraph(table); err != nil { 587 t.Fatal(err) 588 } 589 590 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 591 t.Fatalf("Creating column family: %v", err) 592 } 593 594 // Do highly concurrent reads/writes. 595 const maxConcurrency = 1000 596 var wg sync.WaitGroup 597 for i := 0; i < maxConcurrency; i++ { 598 wg.Add(1) 599 go func() { 600 defer wg.Done() 601 switch r := rand.Intn(100); { // r ∈ [0,100) 602 case 0 <= r && r < 30: 603 // Do a read. 604 _, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1))) 605 if err != nil { 606 t.Errorf("Concurrent read: %v", err) 607 } 608 case 30 <= r && r < 100: 609 // Do a write. 610 mut := NewMutation() 611 mut.Set("ts", "col", 1000, []byte("data")) 612 if err := table.Apply(ctx, "testrow", mut); err != nil { 613 t.Errorf("Concurrent write: %v", err) 614 } 615 } 616 }() 617 } 618 wg.Wait() 619} 620 621func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { 622 ctx := context.Background() 623 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 624 if err != nil { 625 t.Fatal(err) 626 } 627 defer cleanup() 628 629 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 630 t.Fatalf("Creating column family: %v", err) 631 } 632 633 bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set. 634 nonsense := []byte("lorem ipsum dolor sit amet, ") 635 fill(bigBytes, nonsense) 636 mut := NewMutation() 637 mut.Set("ts", "col", 1000, bigBytes) 638 if err := table.Apply(ctx, "bigrow", mut); err != nil { 639 t.Fatalf("Big write: %v", err) 640 } 641 r, err := table.ReadRow(ctx, "bigrow") 642 if err != nil { 643 t.Fatalf("Big read: %v", err) 644 } 645 wantRow := Row{"ts": []ReadItem{ 646 {Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes}, 647 }} 648 if !testutil.Equal(r, wantRow) { 649 t.Fatalf("Big read returned incorrect bytes: %v", r) 650 } 651 652 var wg sync.WaitGroup 653 // Now write 1000 rows, each with 82 KB values, then scan them all. 654 medBytes := make([]byte, 82<<10) 655 fill(medBytes, nonsense) 656 sem := make(chan int, 50) // do up to 50 mutations at a time. 657 for i := 0; i < 1000; i++ { 658 mut := NewMutation() 659 mut.Set("ts", "big-scan", 1000, medBytes) 660 row := fmt.Sprintf("row-%d", i) 661 wg.Add(1) 662 go func() { 663 defer wg.Done() 664 defer func() { <-sem }() 665 sem <- 1 666 if err := table.Apply(ctx, row, mut); err != nil { 667 t.Errorf("Preparing large scan: %v", err) 668 } 669 }() 670 } 671 wg.Wait() 672 n := 0 673 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 674 for _, ris := range r { 675 for _, ri := range ris { 676 n += len(ri.Value) 677 } 678 } 679 return true 680 }, RowFilter(ColumnFilter("big-scan"))) 681 if err != nil { 682 t.Fatalf("Doing large scan: %v", err) 683 } 684 if want := 1000 * len(medBytes); n != want { 685 t.Fatalf("Large scan returned %d bytes, want %d", n, want) 686 } 687 // Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption. 688 rc := 0 689 wantRc := 3 690 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 691 rc++ 692 return true 693 }, LimitRows(int64(wantRc))) 694 if err != nil { 695 t.Fatal(err) 696 } 697 if rc != wantRc { 698 t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) 699 } 700 701 // Test bulk mutations 702 if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil { 703 t.Fatalf("Creating column family: %v", err) 704 } 705 bulkData := map[string][]string{ 706 "red sox": {"2004", "2007", "2013"}, 707 "patriots": {"2001", "2003", "2004", "2014"}, 708 "celtics": {"1981", "1984", "1986", "2008"}, 709 } 710 var rowKeys []string 711 var muts []*Mutation 712 for row, ss := range bulkData { 713 mut := NewMutation() 714 for _, name := range ss { 715 mut.Set("bulk", name, 1000, []byte("1")) 716 } 717 rowKeys = append(rowKeys, row) 718 muts = append(muts, mut) 719 } 720 status, err := table.ApplyBulk(ctx, rowKeys, muts) 721 if err != nil { 722 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 723 } 724 if status != nil { 725 t.Fatalf("non-nil errors: %v", err) 726 } 727 728 // Read each row back 729 for rowKey, ss := range bulkData { 730 row, err := table.ReadRow(ctx, rowKey) 731 if err != nil { 732 t.Fatalf("Reading a bulk row: %v", err) 733 } 734 var wantItems []ReadItem 735 for _, val := range ss { 736 wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")}) 737 } 738 wantRow := Row{"bulk": wantItems} 739 if !testutil.Equal(row, wantRow) { 740 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 741 } 742 } 743 744 // Test bulk write errors. 745 // Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error. 746 badMut := NewMutation() 747 badMut.Set("badfamily", "col", ServerTime, nil) 748 badMut2 := NewMutation() 749 badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1")) 750 status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2}) 751 if err != nil { 752 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 753 } 754 if status == nil { 755 t.Fatalf("No errors for bad bulk mutation") 756 } else if status[0] == nil || status[1] == nil { 757 t.Fatalf("No error for bad bulk mutation") 758 } 759} 760 761func TestIntegration_Read(t *testing.T) { 762 ctx := context.Background() 763 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 764 if err != nil { 765 t.Fatal(err) 766 } 767 defer cleanup() 768 769 // Insert some data. 770 initialData := map[string][]string{ 771 "wmckinley": {"tjefferson"}, 772 "gwashington": {"jadams"}, 773 "tjefferson": {"gwashington", "jadams", "wmckinley"}, 774 "jadams": {"gwashington", "tjefferson"}, 775 } 776 for row, ss := range initialData { 777 mut := NewMutation() 778 for _, name := range ss { 779 mut.Set("follows", name, 1000, []byte("1")) 780 } 781 if err := table.Apply(ctx, row, mut); err != nil { 782 t.Fatalf("Mutating row %q: %v", row, err) 783 } 784 } 785 786 for _, test := range []struct { 787 desc string 788 rr RowSet 789 filter Filter // may be nil 790 limit ReadOption // may be nil 791 792 // We do the read, grab all the cells, turn them into "<row>-<col>-<val>", 793 // and join with a comma. 794 want string 795 }{ 796 { 797 desc: "read all, unfiltered", 798 rr: RowRange{}, 799 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 800 }, 801 { 802 desc: "read with InfiniteRange, unfiltered", 803 rr: InfiniteRange("tjefferson"), 804 want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 805 }, 806 { 807 desc: "read with NewRange, unfiltered", 808 rr: NewRange("gargamel", "hubbard"), 809 want: "gwashington-jadams-1", 810 }, 811 { 812 desc: "read with PrefixRange, unfiltered", 813 rr: PrefixRange("jad"), 814 want: "jadams-gwashington-1,jadams-tjefferson-1", 815 }, 816 { 817 desc: "read with SingleRow, unfiltered", 818 rr: SingleRow("wmckinley"), 819 want: "wmckinley-tjefferson-1", 820 }, 821 { 822 desc: "read all, with ColumnFilter", 823 rr: RowRange{}, 824 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 825 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 826 }, 827 { 828 desc: "read all, with ColumnFilter, prefix", 829 rr: RowRange{}, 830 filter: ColumnFilter("j"), // no matches 831 want: "", 832 }, 833 { 834 desc: "read range, with ColumnRangeFilter", 835 rr: RowRange{}, 836 filter: ColumnRangeFilter("follows", "h", "k"), 837 want: "gwashington-jadams-1,tjefferson-jadams-1", 838 }, 839 { 840 desc: "read range from empty, with ColumnRangeFilter", 841 rr: RowRange{}, 842 filter: ColumnRangeFilter("follows", "", "u"), 843 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 844 }, 845 { 846 desc: "read range from start to empty, with ColumnRangeFilter", 847 rr: RowRange{}, 848 filter: ColumnRangeFilter("follows", "h", ""), 849 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 850 }, 851 { 852 desc: "read with RowKeyFilter", 853 rr: RowRange{}, 854 filter: RowKeyFilter(".*wash.*"), 855 want: "gwashington-jadams-1", 856 }, 857 { 858 desc: "read with RowKeyFilter, prefix", 859 rr: RowRange{}, 860 filter: RowKeyFilter("gwash"), 861 want: "", 862 }, 863 { 864 desc: "read with RowKeyFilter, no matches", 865 rr: RowRange{}, 866 filter: RowKeyFilter(".*xxx.*"), 867 want: "", 868 }, 869 { 870 desc: "read with FamilyFilter, no matches", 871 rr: RowRange{}, 872 filter: FamilyFilter(".*xxx.*"), 873 want: "", 874 }, 875 { 876 desc: "read with ColumnFilter + row limit", 877 rr: RowRange{}, 878 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 879 limit: LimitRows(2), 880 want: "gwashington-jadams-1,jadams-tjefferson-1", 881 }, 882 { 883 desc: "read all, strip values", 884 rr: RowRange{}, 885 filter: StripValueFilter(), 886 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 887 }, 888 { 889 desc: "read with ColumnFilter + row limit + strip values", 890 rr: RowRange{}, 891 filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson" 892 limit: LimitRows(2), 893 want: "gwashington-jadams-,jadams-tjefferson-", 894 }, 895 { 896 desc: "read with condition, strip values on true", 897 rr: RowRange{}, 898 filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil), 899 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 900 }, 901 { 902 desc: "read with condition, strip values on false", 903 rr: RowRange{}, 904 filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()), 905 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 906 }, 907 { 908 desc: "read with ValueRangeFilter + row limit", 909 rr: RowRange{}, 910 filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1" 911 limit: LimitRows(2), 912 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1", 913 }, 914 { 915 desc: "read with ValueRangeFilter, no match on exclusive end", 916 rr: RowRange{}, 917 filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match 918 want: "", 919 }, 920 { 921 desc: "read with ValueRangeFilter, no matches", 922 rr: RowRange{}, 923 filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing 924 want: "", 925 }, 926 { 927 desc: "read with InterleaveFilter, no matches on all filters", 928 rr: RowRange{}, 929 filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")), 930 want: "", 931 }, 932 { 933 desc: "read with InterleaveFilter, no duplicate cells", 934 rr: RowRange{}, 935 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")), 936 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 937 }, 938 { 939 desc: "read with InterleaveFilter, with duplicate cells", 940 rr: RowRange{}, 941 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")), 942 want: "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1", 943 }, 944 { 945 desc: "read with a RowRangeList and no filter", 946 rr: RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")}, 947 want: "gwashington-jadams-1,wmckinley-tjefferson-1", 948 }, 949 { 950 desc: "chain that excludes rows and matches nothing, in a condition", 951 rr: RowRange{}, 952 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil), 953 want: "", 954 }, 955 { 956 desc: "chain that ends with an interleave that has no match. covers #804", 957 rr: RowRange{}, 958 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil), 959 want: "", 960 }, 961 } { 962 t.Run(test.desc, func(t *testing.T) { 963 var opts []ReadOption 964 if test.filter != nil { 965 opts = append(opts, RowFilter(test.filter)) 966 } 967 if test.limit != nil { 968 opts = append(opts, test.limit) 969 } 970 var elt []string 971 err := table.ReadRows(ctx, test.rr, func(r Row) bool { 972 for _, ris := range r { 973 for _, ri := range ris { 974 elt = append(elt, formatReadItem(ri)) 975 } 976 } 977 return true 978 }, opts...) 979 if err != nil { 980 t.Fatal(err) 981 } 982 if got := strings.Join(elt, ","); got != test.want { 983 t.Fatalf("got %q\nwant %q", got, test.want) 984 } 985 }) 986 } 987} 988 989func TestIntegration_SampleRowKeys(t *testing.T) { 990 ctx := context.Background() 991 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 992 if err != nil { 993 t.Fatal(err) 994 } 995 defer cleanup() 996 997 // Insert some data. 998 initialData := map[string][]string{ 999 "wmckinley11": {"tjefferson11"}, 1000 "gwashington77": {"jadams77"}, 1001 "tjefferson0": {"gwashington0", "jadams0"}, 1002 } 1003 1004 for row, ss := range initialData { 1005 mut := NewMutation() 1006 for _, name := range ss { 1007 mut.Set("follows", name, 1000, []byte("1")) 1008 } 1009 if err := table.Apply(ctx, row, mut); err != nil { 1010 t.Fatalf("Mutating row %q: %v", row, err) 1011 } 1012 } 1013 sampleKeys, err := table.SampleRowKeys(context.Background()) 1014 if err != nil { 1015 t.Fatalf("%s: %v", "SampleRowKeys:", err) 1016 } 1017 if len(sampleKeys) == 0 { 1018 t.Error("SampleRowKeys length 0") 1019 } 1020} 1021 1022func TestIntegration_Admin(t *testing.T) { 1023 testEnv, err := NewIntegrationEnv() 1024 if err != nil { 1025 t.Fatalf("IntegrationEnv: %v", err) 1026 } 1027 defer testEnv.Close() 1028 1029 timeout := 2 * time.Second 1030 if testEnv.Config().UseProd { 1031 timeout = 5 * time.Minute 1032 } 1033 ctx, _ := context.WithTimeout(context.Background(), timeout) 1034 1035 adminClient, err := testEnv.NewAdminClient() 1036 if err != nil { 1037 t.Fatalf("NewAdminClient: %v", err) 1038 } 1039 defer adminClient.Close() 1040 1041 iAdminClient, err := testEnv.NewInstanceAdminClient() 1042 if err != nil { 1043 t.Fatalf("NewInstanceAdminClient: %v", err) 1044 } 1045 if iAdminClient != nil { 1046 defer iAdminClient.Close() 1047 1048 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1049 if err != nil { 1050 t.Errorf("InstanceInfo: %v", err) 1051 } 1052 if iInfo.Name != adminClient.instance { 1053 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1054 } 1055 } 1056 1057 list := func() []string { 1058 tbls, err := adminClient.Tables(ctx) 1059 if err != nil { 1060 t.Fatalf("Fetching list of tables: %v", err) 1061 } 1062 sort.Strings(tbls) 1063 return tbls 1064 } 1065 containsAll := func(got, want []string) bool { 1066 gotSet := make(map[string]bool) 1067 1068 for _, s := range got { 1069 gotSet[s] = true 1070 } 1071 for _, s := range want { 1072 if !gotSet[s] { 1073 return false 1074 } 1075 } 1076 return true 1077 } 1078 1079 defer adminClient.DeleteTable(ctx, "mytable") 1080 1081 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1082 t.Fatalf("Creating table: %v", err) 1083 } 1084 1085 defer adminClient.DeleteTable(ctx, "myothertable") 1086 1087 if err := adminClient.CreateTable(ctx, "myothertable"); err != nil { 1088 t.Fatalf("Creating table: %v", err) 1089 } 1090 1091 if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) { 1092 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1093 } 1094 1095 must(adminClient.WaitForReplication(ctx, "mytable")) 1096 1097 if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil { 1098 t.Fatalf("Deleting table: %v", err) 1099 } 1100 tables := list() 1101 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1102 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1103 } 1104 if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) { 1105 t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted) 1106 } 1107 1108 tblConf := TableConf{ 1109 TableID: "conftable", 1110 Families: map[string]GCPolicy{ 1111 "fam1": MaxVersionsPolicy(1), 1112 "fam2": MaxVersionsPolicy(2), 1113 }, 1114 } 1115 if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil { 1116 t.Fatalf("Creating table from TableConf: %v", err) 1117 } 1118 defer adminClient.DeleteTable(ctx, tblConf.TableID) 1119 1120 tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID) 1121 if err != nil { 1122 t.Fatalf("Getting table info: %v", err) 1123 } 1124 sort.Strings(tblInfo.Families) 1125 wantFams := []string{"fam1", "fam2"} 1126 if !testutil.Equal(tblInfo.Families, wantFams) { 1127 t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams) 1128 } 1129 1130 // Populate mytable and drop row ranges 1131 if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil { 1132 t.Fatalf("Creating column family: %v", err) 1133 } 1134 1135 client, err := testEnv.NewClient() 1136 if err != nil { 1137 t.Fatalf("NewClient: %v", err) 1138 } 1139 defer client.Close() 1140 1141 tbl := client.Open("mytable") 1142 1143 prefixes := []string{"a", "b", "c"} 1144 for _, prefix := range prefixes { 1145 for i := 0; i < 5; i++ { 1146 mut := NewMutation() 1147 mut.Set("cf", "col", 1000, []byte("1")) 1148 if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil { 1149 t.Fatalf("Mutating row: %v", err) 1150 } 1151 } 1152 } 1153 1154 if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil { 1155 t.Errorf("DropRowRange a: %v", err) 1156 } 1157 if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil { 1158 t.Errorf("DropRowRange c: %v", err) 1159 } 1160 if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil { 1161 t.Errorf("DropRowRange x: %v", err) 1162 } 1163 1164 var gotRowCount int 1165 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1166 gotRowCount++ 1167 if !strings.HasPrefix(row.Key(), "b") { 1168 t.Errorf("Invalid row after dropping range: %v", row) 1169 } 1170 return true 1171 })) 1172 if gotRowCount != 5 { 1173 t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5) 1174 } 1175 1176 if err = adminClient.DropAllRows(ctx, "mytable"); err != nil { 1177 t.Errorf("DropAllRows mytable: %v", err) 1178 } 1179 1180 gotRowCount = 0 1181 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1182 gotRowCount++ 1183 return true 1184 })) 1185 if gotRowCount != 0 { 1186 t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0) 1187 } 1188} 1189 1190func TestIntegration_TableIam(t *testing.T) { 1191 testEnv, err := NewIntegrationEnv() 1192 if err != nil { 1193 t.Fatalf("IntegrationEnv: %v", err) 1194 } 1195 defer testEnv.Close() 1196 1197 if !testEnv.Config().UseProd { 1198 t.Skip("emulator doesn't support IAM Policy creation") 1199 } 1200 1201 timeout := 5 * time.Minute 1202 ctx, _ := context.WithTimeout(context.Background(), timeout) 1203 1204 adminClient, err := testEnv.NewAdminClient() 1205 if err != nil { 1206 t.Fatalf("NewAdminClient: %v", err) 1207 } 1208 defer adminClient.Close() 1209 1210 defer adminClient.DeleteTable(ctx, "mytable") 1211 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1212 t.Fatalf("Creating table: %v", err) 1213 } 1214 1215 // Verify that the IAM Controls work for Tables. 1216 iamHandle := adminClient.TableIAM("mytable") 1217 p, err := iamHandle.Policy(ctx) 1218 if err != nil { 1219 t.Errorf("Iam GetPolicy mytable: %v", err) 1220 } 1221 if err = iamHandle.SetPolicy(ctx, p); err != nil { 1222 t.Errorf("Iam SetPolicy mytable: %v", err) 1223 } 1224 if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil { 1225 t.Errorf("Iam TestPermissions mytable: %v", err) 1226 } 1227} 1228 1229func TestIntegration_AdminCreateInstance(t *testing.T) { 1230 if instanceToCreate == "" { 1231 t.Skip("instanceToCreate not set, skipping instance creation testing") 1232 } 1233 1234 testEnv, err := NewIntegrationEnv() 1235 if err != nil { 1236 t.Fatalf("IntegrationEnv: %v", err) 1237 } 1238 defer testEnv.Close() 1239 1240 if !testEnv.Config().UseProd { 1241 t.Skip("emulator doesn't support instance creation") 1242 } 1243 1244 timeout := 5 * time.Minute 1245 ctx, _ := context.WithTimeout(context.Background(), timeout) 1246 1247 iAdminClient, err := testEnv.NewInstanceAdminClient() 1248 if err != nil { 1249 t.Fatalf("NewInstanceAdminClient: %v", err) 1250 } 1251 defer iAdminClient.Close() 1252 1253 clusterID := instanceToCreate + "-cluster" 1254 1255 // Create a development instance 1256 conf := &InstanceConf{ 1257 InstanceId: instanceToCreate, 1258 ClusterId: clusterID, 1259 DisplayName: "test instance", 1260 Zone: instanceToCreateZone, 1261 InstanceType: DEVELOPMENT, 1262 } 1263 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1264 t.Fatalf("CreateInstance: %v", err) 1265 } 1266 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1267 1268 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1269 if err != nil { 1270 t.Fatalf("InstanceInfo: %v", err) 1271 } 1272 1273 // Basic return values are tested elsewhere, check instance type 1274 if iInfo.InstanceType != DEVELOPMENT { 1275 t.Fatalf("Instance is not DEVELOPMENT: %v", err) 1276 } 1277 1278 // Update everything we can about the instance in one call. 1279 confWithClusters := &InstanceWithClustersConfig{ 1280 InstanceID: instanceToCreate, 1281 DisplayName: "new display name", 1282 InstanceType: PRODUCTION, 1283 Clusters: []ClusterConfig{ 1284 {ClusterID: clusterID, NumNodes: 5, StorageType: HDD}}, 1285 } 1286 1287 if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { 1288 t.Fatalf("UpdateInstanceWithClusters: %v", err) 1289 } 1290 1291 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1292 if err != nil { 1293 t.Fatalf("InstanceInfo: %v", err) 1294 } 1295 1296 if iInfo.InstanceType != PRODUCTION { 1297 t.Fatalf("Instance type is not PRODUCTION: %v", err) 1298 } 1299 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1300 t.Fatalf("Display name: %q, want: %q", got, want) 1301 } 1302 1303 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1304 if err != nil { 1305 t.Fatalf("GetCluster: %v", err) 1306 } 1307 1308 if cInfo.ServeNodes != 5 { 1309 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1310 } 1311 1312 if cInfo.StorageType != HDD { 1313 t.Fatalf("StorageType: %v, want: %v", cInfo.StorageType, HDD) 1314 } 1315} 1316 1317func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) { 1318 if instanceToCreate == "" { 1319 t.Skip("instanceToCreate not set, skipping instance update testing") 1320 } 1321 1322 testEnv, err := NewIntegrationEnv() 1323 if err != nil { 1324 t.Fatalf("IntegrationEnv: %v", err) 1325 } 1326 defer testEnv.Close() 1327 1328 if !testEnv.Config().UseProd { 1329 t.Skip("emulator doesn't support instance creation") 1330 } 1331 1332 timeout := 5 * time.Minute 1333 ctx, _ := context.WithTimeout(context.Background(), timeout) 1334 1335 iAdminClient, err := testEnv.NewInstanceAdminClient() 1336 if err != nil { 1337 t.Fatalf("NewInstanceAdminClient: %v", err) 1338 } 1339 defer iAdminClient.Close() 1340 1341 clusterID := instanceToCreate + "-cluster" 1342 1343 // Create a development instance 1344 conf := &InstanceConf{ 1345 InstanceId: instanceToCreate, 1346 ClusterId: clusterID, 1347 DisplayName: "test instance", 1348 Zone: instanceToCreateZone, 1349 InstanceType: DEVELOPMENT, 1350 } 1351 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1352 t.Fatalf("CreateInstance: %v", err) 1353 } 1354 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1355 1356 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1357 if err != nil { 1358 t.Fatalf("InstanceInfo: %v", err) 1359 } 1360 1361 // Basic return values are tested elsewhere, check instance type 1362 if iInfo.InstanceType != DEVELOPMENT { 1363 t.Fatalf("Instance is not DEVELOPMENT: %v", err) 1364 } 1365 1366 // Update everything we can about the instance in one call. 1367 confWithClusters := &InstanceWithClustersConfig{ 1368 InstanceID: instanceToCreate, 1369 DisplayName: "new display name", 1370 InstanceType: PRODUCTION, 1371 Clusters: []ClusterConfig{ 1372 {ClusterID: clusterID, NumNodes: 5}}, 1373 } 1374 1375 results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1376 if err != nil { 1377 t.Fatalf("UpdateInstanceAndSyncClusters: %v", err) 1378 } 1379 1380 wantResults := UpdateInstanceResults{ 1381 InstanceUpdated: true, 1382 UpdatedClusters: []string{clusterID}, 1383 } 1384 if diff := testutil.Diff(*results, wantResults); diff != "" { 1385 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1386 } 1387 1388 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1389 if err != nil { 1390 t.Fatalf("InstanceInfo: %v", err) 1391 } 1392 1393 if iInfo.InstanceType != PRODUCTION { 1394 t.Fatalf("Instance type is not PRODUCTION: %v", err) 1395 } 1396 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1397 t.Fatalf("Display name: %q, want: %q", got, want) 1398 } 1399 1400 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1401 if err != nil { 1402 t.Fatalf("GetCluster: %v", err) 1403 } 1404 1405 if cInfo.ServeNodes != 5 { 1406 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1407 } 1408 1409 // Now add a second cluster as the only change. The first cluster must also be provided so 1410 // it is not removed. 1411 clusterID2 := clusterID + "-2" 1412 confWithClusters = &InstanceWithClustersConfig{ 1413 InstanceID: instanceToCreate, 1414 Clusters: []ClusterConfig{ 1415 {ClusterID: clusterID}, 1416 {ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2}}, 1417 } 1418 1419 results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1420 if err != nil { 1421 t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err) 1422 } 1423 1424 wantResults = UpdateInstanceResults{ 1425 InstanceUpdated: false, 1426 CreatedClusters: []string{clusterID2}, 1427 } 1428 if diff := testutil.Diff(*results, wantResults); diff != "" { 1429 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1430 } 1431 1432 // Now update one cluster and delete the other 1433 confWithClusters = &InstanceWithClustersConfig{ 1434 InstanceID: instanceToCreate, 1435 Clusters: []ClusterConfig{ 1436 {ClusterID: clusterID, NumNodes: 4}}, 1437 } 1438 1439 results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1440 if err != nil { 1441 t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err) 1442 } 1443 1444 wantResults = UpdateInstanceResults{ 1445 InstanceUpdated: false, 1446 UpdatedClusters: []string{clusterID}, 1447 DeletedClusters: []string{clusterID2}, 1448 } 1449 1450 if diff := testutil.Diff(*results, wantResults); diff != "" { 1451 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1452 } 1453 1454 // Make sure the instance looks as we would expect 1455 clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId) 1456 if err != nil { 1457 t.Fatalf("Clusters: %v", err) 1458 } 1459 1460 if len(clusters) != 1 { 1461 t.Fatalf("Clusters length %v, want: 1", len(clusters)) 1462 } 1463 1464 wantCluster := &ClusterInfo{ 1465 Name: clusterID, 1466 Zone: instanceToCreateZone, 1467 ServeNodes: 4, 1468 State: "READY", 1469 } 1470 if diff := testutil.Diff(clusters[0], wantCluster); diff != "" { 1471 t.Fatalf("InstanceEquality: got - want +\n%s", diff) 1472 } 1473} 1474 1475func TestIntegration_AdminSnapshot(t *testing.T) { 1476 testEnv, err := NewIntegrationEnv() 1477 if err != nil { 1478 t.Fatalf("IntegrationEnv: %v", err) 1479 } 1480 defer testEnv.Close() 1481 1482 if !testEnv.Config().UseProd { 1483 t.Skip("emulator doesn't support snapshots") 1484 } 1485 1486 timeout := 2 * time.Second 1487 if testEnv.Config().UseProd { 1488 timeout = 5 * time.Minute 1489 } 1490 ctx, _ := context.WithTimeout(context.Background(), timeout) 1491 1492 adminClient, err := testEnv.NewAdminClient() 1493 if err != nil { 1494 t.Fatalf("NewAdminClient: %v", err) 1495 } 1496 defer adminClient.Close() 1497 1498 table := testEnv.Config().Table 1499 cluster := testEnv.Config().Cluster 1500 1501 list := func(cluster string) ([]*SnapshotInfo, error) { 1502 infos := []*SnapshotInfo(nil) 1503 1504 it := adminClient.Snapshots(ctx, cluster) 1505 for { 1506 s, err := it.Next() 1507 if err == iterator.Done { 1508 break 1509 } 1510 if err != nil { 1511 return nil, err 1512 } 1513 infos = append(infos, s) 1514 } 1515 return infos, err 1516 } 1517 1518 // Delete the table at the end of the test. Schedule ahead of time 1519 // in case the client fails 1520 defer adminClient.DeleteTable(ctx, table) 1521 1522 if err := adminClient.CreateTable(ctx, table); err != nil { 1523 t.Fatalf("Creating table: %v", err) 1524 } 1525 1526 // Precondition: no snapshots 1527 snapshots, err := list(cluster) 1528 if err != nil { 1529 t.Fatalf("Initial snapshot list: %v", err) 1530 } 1531 if got, want := len(snapshots), 0; got != want { 1532 t.Fatalf("Initial snapshot list len: %d, want: %d", got, want) 1533 } 1534 1535 // Create snapshot 1536 defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot") 1537 1538 if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil { 1539 t.Fatalf("Creating snaphot: %v", err) 1540 } 1541 1542 // List snapshot 1543 snapshots, err = list(cluster) 1544 if err != nil { 1545 t.Fatalf("Listing snapshots: %v", err) 1546 } 1547 if got, want := len(snapshots), 1; got != want { 1548 t.Fatalf("Listing snapshot count: %d, want: %d", got, want) 1549 } 1550 if got, want := snapshots[0].Name, "mysnapshot"; got != want { 1551 t.Fatalf("Snapshot name: %s, want: %s", got, want) 1552 } 1553 if got, want := snapshots[0].SourceTable, table; got != want { 1554 t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want) 1555 } 1556 if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { 1557 t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want) 1558 } 1559 1560 // Get snapshot 1561 snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot") 1562 if err != nil { 1563 t.Fatalf("SnapshotInfo: %v", snapshot) 1564 } 1565 if got, want := *snapshot, *snapshots[0]; got != want { 1566 t.Fatalf("SnapshotInfo: %v, want: %v", got, want) 1567 } 1568 1569 // Restore 1570 restoredTable := table + "-restored" 1571 defer adminClient.DeleteTable(ctx, restoredTable) 1572 if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil { 1573 t.Fatalf("CreateTableFromSnapshot: %v", err) 1574 } 1575 if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { 1576 t.Fatalf("Restored TableInfo: %v", err) 1577 } 1578 1579 // Delete snapshot 1580 if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil { 1581 t.Fatalf("DeleteSnapshot: %v", err) 1582 } 1583 snapshots, err = list(cluster) 1584 if err != nil { 1585 t.Fatalf("List after Delete: %v", err) 1586 } 1587 if got, want := len(snapshots), 0; got != want { 1588 t.Fatalf("List after delete len: %d, want: %d", got, want) 1589 } 1590} 1591 1592func TestIntegration_Granularity(t *testing.T) { 1593 testEnv, err := NewIntegrationEnv() 1594 if err != nil { 1595 t.Fatalf("IntegrationEnv: %v", err) 1596 } 1597 defer testEnv.Close() 1598 1599 timeout := 2 * time.Second 1600 if testEnv.Config().UseProd { 1601 timeout = 5 * time.Minute 1602 } 1603 ctx, _ := context.WithTimeout(context.Background(), timeout) 1604 1605 adminClient, err := testEnv.NewAdminClient() 1606 if err != nil { 1607 t.Fatalf("NewAdminClient: %v", err) 1608 } 1609 defer adminClient.Close() 1610 1611 list := func() []string { 1612 tbls, err := adminClient.Tables(ctx) 1613 if err != nil { 1614 t.Fatalf("Fetching list of tables: %v", err) 1615 } 1616 sort.Strings(tbls) 1617 return tbls 1618 } 1619 containsAll := func(got, want []string) bool { 1620 gotSet := make(map[string]bool) 1621 1622 for _, s := range got { 1623 gotSet[s] = true 1624 } 1625 for _, s := range want { 1626 if !gotSet[s] { 1627 return false 1628 } 1629 } 1630 return true 1631 } 1632 1633 defer adminClient.DeleteTable(ctx, "mytable") 1634 1635 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1636 t.Fatalf("Creating table: %v", err) 1637 } 1638 1639 tables := list() 1640 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1641 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1642 } 1643 1644 // calling ModifyColumnFamilies to check the granularity of table 1645 prefix := adminClient.instancePrefix() 1646 req := &btapb.ModifyColumnFamiliesRequest{ 1647 Name: prefix + "/tables/" + "mytable", 1648 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 1649 Id: "cf", 1650 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}}, 1651 }}, 1652 } 1653 table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req) 1654 if err != nil { 1655 t.Fatalf("Creating column family: %v", err) 1656 } 1657 if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) { 1658 t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS)) 1659 } 1660} 1661 1662func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) { 1663 testEnv, err := NewIntegrationEnv() 1664 if err != nil { 1665 t.Fatalf("IntegrationEnv: %v", err) 1666 } 1667 defer testEnv.Close() 1668 1669 timeout := 2 * time.Second 1670 if testEnv.Config().UseProd { 1671 timeout = 5 * time.Minute 1672 } 1673 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1674 defer cancel() 1675 1676 adminClient, err := testEnv.NewAdminClient() 1677 if err != nil { 1678 t.Fatalf("NewAdminClient: %v", err) 1679 } 1680 defer adminClient.Close() 1681 1682 iAdminClient, err := testEnv.NewInstanceAdminClient() 1683 if err != nil { 1684 t.Fatalf("NewInstanceAdminClient: %v", err) 1685 } 1686 1687 if iAdminClient == nil { 1688 return 1689 } 1690 1691 defer iAdminClient.Close() 1692 profile := ProfileConf{ 1693 ProfileID: "app_profile1", 1694 InstanceID: adminClient.instance, 1695 ClusterID: testEnv.Config().Cluster, 1696 Description: "creating new app profile 1", 1697 RoutingPolicy: SingleClusterRouting, 1698 } 1699 1700 createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile) 1701 if err != nil { 1702 t.Fatalf("Creating app profile: %v", err) 1703 1704 } 1705 1706 gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 1707 1708 if err != nil { 1709 t.Fatalf("Get app profile: %v", err) 1710 } 1711 1712 if !proto.Equal(createdProfile, gotProfile) { 1713 t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name) 1714 1715 } 1716 1717 list := func(instanceID string) ([]*btapb.AppProfile, error) { 1718 profiles := []*btapb.AppProfile(nil) 1719 1720 it := iAdminClient.ListAppProfiles(ctx, instanceID) 1721 for { 1722 s, err := it.Next() 1723 if err == iterator.Done { 1724 break 1725 } 1726 if err != nil { 1727 return nil, err 1728 } 1729 profiles = append(profiles, s) 1730 } 1731 return profiles, err 1732 } 1733 1734 profiles, err := list(adminClient.instance) 1735 if err != nil { 1736 t.Fatalf("List app profile: %v", err) 1737 } 1738 1739 if got, want := len(profiles), 1; got != want { 1740 t.Fatalf("Initial app profile list len: %d, want: %d", got, want) 1741 } 1742 1743 for _, test := range []struct { 1744 desc string 1745 uattrs ProfileAttrsToUpdate 1746 want *btapb.AppProfile // nil means error 1747 }{ 1748 { 1749 desc: "empty update", 1750 uattrs: ProfileAttrsToUpdate{}, 1751 want: nil, 1752 }, 1753 1754 { 1755 desc: "empty description update", 1756 uattrs: ProfileAttrsToUpdate{Description: ""}, 1757 want: &btapb.AppProfile{ 1758 Name: gotProfile.Name, 1759 Description: "", 1760 RoutingPolicy: gotProfile.RoutingPolicy, 1761 Etag: gotProfile.Etag}, 1762 }, 1763 { 1764 desc: "routing update", 1765 uattrs: ProfileAttrsToUpdate{ 1766 RoutingPolicy: SingleClusterRouting, 1767 ClusterID: testEnv.Config().Cluster, 1768 }, 1769 want: &btapb.AppProfile{ 1770 Name: gotProfile.Name, 1771 Description: "", 1772 Etag: gotProfile.Etag, 1773 RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{ 1774 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1775 ClusterId: testEnv.Config().Cluster, 1776 }}, 1777 }, 1778 }, 1779 } { 1780 err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs) 1781 if err != nil { 1782 if test.want != nil { 1783 t.Errorf("%s: %v", test.desc, err) 1784 } 1785 continue 1786 } 1787 if err == nil && test.want == nil { 1788 t.Errorf("%s: got nil, want error", test.desc) 1789 continue 1790 } 1791 1792 got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 1793 1794 if !proto.Equal(got, test.want) { 1795 t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want) 1796 } 1797 1798 } 1799 1800 err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1") 1801 if err != nil { 1802 t.Fatalf("Delete app profile: %v", err) 1803 } 1804 1805} 1806 1807func TestIntegration_InstanceUpdate(t *testing.T) { 1808 testEnv, err := NewIntegrationEnv() 1809 if err != nil { 1810 t.Fatalf("IntegrationEnv: %v", err) 1811 } 1812 defer testEnv.Close() 1813 1814 timeout := 2 * time.Second 1815 if testEnv.Config().UseProd { 1816 timeout = 5 * time.Minute 1817 } 1818 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1819 defer cancel() 1820 1821 adminClient, err := testEnv.NewAdminClient() 1822 if err != nil { 1823 t.Fatalf("NewAdminClient: %v", err) 1824 } 1825 1826 defer adminClient.Close() 1827 1828 iAdminClient, err := testEnv.NewInstanceAdminClient() 1829 if err != nil { 1830 t.Fatalf("NewInstanceAdminClient: %v", err) 1831 } 1832 1833 if iAdminClient == nil { 1834 return 1835 } 1836 1837 defer iAdminClient.Close() 1838 1839 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1840 if err != nil { 1841 t.Errorf("InstanceInfo: %v", err) 1842 } 1843 1844 if iInfo.Name != adminClient.instance { 1845 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1846 } 1847 1848 if iInfo.DisplayName != adminClient.instance { 1849 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1850 } 1851 1852 const numNodes = 4 1853 // update cluster nodes 1854 if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil { 1855 t.Errorf("UpdateCluster: %v", err) 1856 } 1857 1858 // get cluster after updating 1859 cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster) 1860 if err != nil { 1861 t.Errorf("GetCluster %v", err) 1862 } 1863 if cis.ServeNodes != int(numNodes) { 1864 t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes)) 1865 } 1866} 1867 1868func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { 1869 testEnv, err := NewIntegrationEnv() 1870 if err != nil { 1871 return nil, nil, nil, "", nil, err 1872 } 1873 1874 var timeout time.Duration 1875 if testEnv.Config().UseProd { 1876 timeout = 10 * time.Minute 1877 t.Logf("Running test against production") 1878 } else { 1879 timeout = 5 * time.Minute 1880 t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint) 1881 } 1882 ctx, cancel := context.WithTimeout(ctx, timeout) 1883 1884 client, err := testEnv.NewClient() 1885 if err != nil { 1886 return nil, nil, nil, "", nil, err 1887 } 1888 1889 adminClient, err := testEnv.NewAdminClient() 1890 if err != nil { 1891 return nil, nil, nil, "", nil, err 1892 } 1893 1894 if testEnv.Config().UseProd { 1895 // TODO: tables may not be successfully deleted in some cases, and will 1896 // become obsolete. We may need a way to automatically delete them. 1897 tableName = tableNameSpace.New() 1898 } else { 1899 tableName = testEnv.Config().Table 1900 } 1901 1902 if err := adminClient.CreateTable(ctx, tableName); err != nil { 1903 cancel() 1904 return nil, nil, nil, "", nil, err 1905 } 1906 if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil { 1907 cancel() 1908 return nil, nil, nil, "", nil, err 1909 } 1910 1911 return client, adminClient, client.Open(tableName), tableName, func() { 1912 if err := adminClient.DeleteTable(ctx, tableName); err != nil { 1913 t.Errorf("DeleteTable got error %v", err) 1914 } 1915 cancel() 1916 client.Close() 1917 adminClient.Close() 1918 }, nil 1919} 1920 1921func formatReadItem(ri ReadItem) string { 1922 // Use the column qualifier only to make the test data briefer. 1923 col := ri.Column[strings.Index(ri.Column, ":")+1:] 1924 return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value) 1925} 1926 1927func fill(b, sub []byte) { 1928 for len(b) > len(sub) { 1929 n := copy(b, sub) 1930 b = b[n:] 1931 } 1932} 1933 1934func clearTimestamps(r Row) { 1935 for _, ris := range r { 1936 for i := range ris { 1937 ris[i].Timestamp = 0 1938 } 1939 } 1940} 1941