1/* 2Copyright 2019 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "context" 21 "flag" 22 "fmt" 23 "math" 24 "math/rand" 25 "reflect" 26 "sort" 27 "strings" 28 "sync" 29 "testing" 30 "time" 31 32 "cloud.google.com/go/internal" 33 "cloud.google.com/go/internal/testutil" 34 "cloud.google.com/go/internal/uid" 35 "github.com/golang/protobuf/proto" 36 "github.com/google/go-cmp/cmp" 37 gax "github.com/googleapis/gax-go/v2" 38 "google.golang.org/api/iterator" 39 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 40) 41 42var ( 43 presidentsSocialGraph = map[string][]string{ 44 "wmckinley": {"tjefferson"}, 45 "gwashington": {"jadams"}, 46 "tjefferson": {"gwashington", "jadams"}, 47 "jadams": {"gwashington", "tjefferson"}, 48 } 49 50 tableNameSpace = uid.NewSpace("cbt-test", &uid.Options{Short: true}) 51) 52 53func populatePresidentsGraph(table *Table) error { 54 ctx := context.Background() 55 for row, ss := range presidentsSocialGraph { 56 mut := NewMutation() 57 for _, name := range ss { 58 mut.Set("follows", name, 1000, []byte("1")) 59 } 60 if err := table.Apply(ctx, row, mut); err != nil { 61 return fmt.Errorf("Mutating row %q: %v", row, err) 62 } 63 } 64 return nil 65} 66 67var instanceToCreate string 68var instanceToCreateZone string 69var instanceToCreateZone2 string 70 71func init() { 72 // Don't test instance creation by default, as quota is necessary and aborted tests could strand resources. 73 flag.StringVar(&instanceToCreate, "it.instance-to-create", "", 74 "The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.") 75 flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b", 76 "The zone in which to create the new test instance.") 77 flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c", 78 "The zone in which to create a second cluster in the test instance.") 79} 80 81func TestIntegration_ConditionalMutations(t *testing.T) { 82 ctx := context.Background() 83 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 84 if err != nil { 85 t.Fatal(err) 86 } 87 defer cleanup() 88 89 if err := populatePresidentsGraph(table); err != nil { 90 t.Fatal(err) 91 } 92 93 // Do a conditional mutation with a complex filter. 94 mutTrue := NewMutation() 95 mutTrue.Set("follows", "wmckinley", 1000, []byte("1")) 96 filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter(".")) 97 mut := NewCondMutation(filter, mutTrue, nil) 98 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 99 t.Fatalf("Conditionally mutating row: %v", err) 100 } 101 // Do a second condition mutation with a filter that does not match, 102 // and thus no changes should be made. 103 mutTrue = NewMutation() 104 mutTrue.DeleteRow() 105 filter = ColumnFilter("snoop.dogg") 106 mut = NewCondMutation(filter, mutTrue, nil) 107 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 108 t.Fatalf("Conditionally mutating row: %v", err) 109 } 110 111 // Fetch a row. 112 row, err := table.ReadRow(ctx, "jadams") 113 if err != nil { 114 t.Fatalf("Reading a row: %v", err) 115 } 116 wantRow := Row{ 117 "follows": []ReadItem{ 118 {Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")}, 119 {Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")}, 120 }, 121 } 122 if !testutil.Equal(row, wantRow) { 123 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 124 } 125} 126 127func TestIntegration_PartialReadRows(t *testing.T) { 128 ctx := context.Background() 129 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 130 if err != nil { 131 t.Fatal(err) 132 } 133 defer cleanup() 134 135 if err := populatePresidentsGraph(table); err != nil { 136 t.Fatal(err) 137 } 138 139 // Do a scan and stop part way through. 140 // Verify that the ReadRows callback doesn't keep running. 141 stopped := false 142 err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { 143 if r.Key() < "h" { 144 return true 145 } 146 if !stopped { 147 stopped = true 148 return false 149 } 150 t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key()) 151 return false 152 }) 153 if err != nil { 154 t.Fatalf("Partial ReadRows: %v", err) 155 } 156} 157 158func TestIntegration_ReadRowList(t *testing.T) { 159 ctx := context.Background() 160 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 161 if err != nil { 162 t.Fatal(err) 163 } 164 defer cleanup() 165 166 if err := populatePresidentsGraph(table); err != nil { 167 t.Fatal(err) 168 } 169 170 // Read a RowList 171 var elt []string 172 keys := RowList{"wmckinley", "gwashington", "jadams"} 173 want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1" 174 err = table.ReadRows(ctx, keys, func(r Row) bool { 175 for _, ris := range r { 176 for _, ri := range ris { 177 elt = append(elt, formatReadItem(ri)) 178 } 179 } 180 return true 181 }) 182 if err != nil { 183 t.Fatalf("read RowList: %v", err) 184 } 185 186 if got := strings.Join(elt, ","); got != want { 187 t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) 188 } 189} 190 191func TestIntegration_DeleteRow(t *testing.T) { 192 ctx := context.Background() 193 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 194 if err != nil { 195 t.Fatal(err) 196 } 197 defer cleanup() 198 199 if err := populatePresidentsGraph(table); err != nil { 200 t.Fatal(err) 201 } 202 203 // Delete a row and check it goes away. 204 mut := NewMutation() 205 mut.DeleteRow() 206 if err := table.Apply(ctx, "wmckinley", mut); err != nil { 207 t.Fatalf("Apply DeleteRow: %v", err) 208 } 209 row, err := table.ReadRow(ctx, "wmckinley") 210 if err != nil { 211 t.Fatalf("Reading a row after DeleteRow: %v", err) 212 } 213 if len(row) != 0 { 214 t.Fatalf("Read non-zero row after DeleteRow: %v", row) 215 } 216} 217 218func TestIntegration_ReadModifyWrite(t *testing.T) { 219 ctx := context.Background() 220 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 221 if err != nil { 222 t.Fatal(err) 223 } 224 defer cleanup() 225 226 if err := populatePresidentsGraph(table); err != nil { 227 t.Fatal(err) 228 } 229 230 if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil { 231 t.Fatalf("Creating column family: %v", err) 232 } 233 234 appendRMW := func(b []byte) *ReadModifyWrite { 235 rmw := NewReadModifyWrite() 236 rmw.AppendValue("counter", "likes", b) 237 return rmw 238 } 239 incRMW := func(n int64) *ReadModifyWrite { 240 rmw := NewReadModifyWrite() 241 rmw.Increment("counter", "likes", n) 242 return rmw 243 } 244 rmwSeq := []struct { 245 desc string 246 rmw *ReadModifyWrite 247 want []byte 248 }{ 249 { 250 desc: "append #1", 251 rmw: appendRMW([]byte{0, 0, 0}), 252 want: []byte{0, 0, 0}, 253 }, 254 { 255 desc: "append #2", 256 rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17 257 want: []byte{0, 0, 0, 0, 0, 0, 0, 17}, 258 }, 259 { 260 desc: "increment", 261 rmw: incRMW(8), 262 want: []byte{0, 0, 0, 0, 0, 0, 0, 25}, 263 }, 264 } 265 for _, step := range rmwSeq { 266 row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw) 267 if err != nil { 268 t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) 269 } 270 // Make sure the modified cell returned by the RMW operation has a timestamp. 271 if row["counter"][0].Timestamp == 0 { 272 t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp) 273 } 274 clearTimestamps(row) 275 wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}} 276 if !testutil.Equal(row, wantRow) { 277 t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow) 278 } 279 } 280 281 // Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator. 282 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0})) 283 if err != nil { 284 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 285 } 286 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0})) 287 if err != nil { 288 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 289 } 290 // Get only the correct row back on read. 291 r, err := table.ReadRow(ctx, "issue-723-1") 292 if err != nil { 293 t.Fatalf("Reading row: %v", err) 294 } 295 if r.Key() != "issue-723-1" { 296 t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1") 297 } 298} 299 300func TestIntegration_ArbitraryTimestamps(t *testing.T) { 301 ctx := context.Background() 302 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 303 if err != nil { 304 t.Fatal(err) 305 } 306 defer cleanup() 307 308 // Test arbitrary timestamps more thoroughly. 309 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 310 t.Fatalf("Creating column family: %v", err) 311 } 312 const numVersions = 4 313 mut := NewMutation() 314 for i := 1; i < numVersions; i++ { 315 // Timestamps are used in thousands because the server 316 // only permits that granularity. 317 mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 318 mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 319 } 320 if err := table.Apply(ctx, "testrow", mut); err != nil { 321 t.Fatalf("Mutating row: %v", err) 322 } 323 r, err := table.ReadRow(ctx, "testrow") 324 if err != nil { 325 t.Fatalf("Reading row: %v", err) 326 } 327 wantRow := Row{"ts": []ReadItem{ 328 // These should be returned in descending timestamp order. 329 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 330 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 331 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 332 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 333 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 334 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 335 }} 336 if !testutil.Equal(r, wantRow) { 337 t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow) 338 } 339 340 // Do the same read, but filter to the latest two versions. 341 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 342 if err != nil { 343 t.Fatalf("Reading row: %v", err) 344 } 345 wantRow = Row{"ts": []ReadItem{ 346 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 347 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 348 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 349 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 350 }} 351 if !testutil.Equal(r, wantRow) { 352 t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow) 353 } 354 // Check cell offset / limit 355 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3))) 356 if err != nil { 357 t.Fatalf("Reading row: %v", err) 358 } 359 wantRow = Row{"ts": []ReadItem{ 360 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 361 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 362 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 363 }} 364 if !testutil.Equal(r, wantRow) { 365 t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow) 366 } 367 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3))) 368 if err != nil { 369 t.Fatalf("Reading row: %v", err) 370 } 371 wantRow = Row{"ts": []ReadItem{ 372 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 373 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 374 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 375 }} 376 if !testutil.Equal(r, wantRow) { 377 t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow) 378 } 379 // Check timestamp range filtering (with truncation) 380 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000))) 381 if err != nil { 382 t.Fatalf("Reading row: %v", err) 383 } 384 wantRow = Row{"ts": []ReadItem{ 385 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 386 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 387 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 388 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 389 }} 390 if !testutil.Equal(r, wantRow) { 391 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow) 392 } 393 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0))) 394 if err != nil { 395 t.Fatalf("Reading row: %v", err) 396 } 397 wantRow = Row{"ts": []ReadItem{ 398 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 399 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 400 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 401 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 402 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 403 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 404 }} 405 if !testutil.Equal(r, wantRow) { 406 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow) 407 } 408 // Delete non-existing cells, no such column family in this row 409 // Should not delete anything 410 if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil { 411 t.Fatalf("Creating column family: %v", err) 412 } 413 mut = NewMutation() 414 mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval 415 if err := table.Apply(ctx, "testrow", mut); err != nil { 416 t.Fatalf("Mutating row: %v", err) 417 } 418 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 419 if err != nil { 420 t.Fatalf("Reading row: %v", err) 421 } 422 if !testutil.Equal(r, wantRow) { 423 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 424 } 425 // Delete non-existing cells, no such column in this column family 426 // Should not delete anything 427 mut = NewMutation() 428 mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval 429 if err := table.Apply(ctx, "testrow", mut); err != nil { 430 t.Fatalf("Mutating row: %v", err) 431 } 432 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 433 if err != nil { 434 t.Fatalf("Reading row: %v", err) 435 } 436 if !testutil.Equal(r, wantRow) { 437 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 438 } 439 // Delete the cell with timestamp 2000 and repeat the last read, 440 // checking that we get ts 3000 and ts 1000. 441 mut = NewMutation() 442 mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval 443 if err := table.Apply(ctx, "testrow", mut); err != nil { 444 t.Fatalf("Mutating row: %v", err) 445 } 446 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 447 if err != nil { 448 t.Fatalf("Reading row: %v", err) 449 } 450 wantRow = Row{"ts": []ReadItem{ 451 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 452 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 453 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 454 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 455 }} 456 if !testutil.Equal(r, wantRow) { 457 t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow) 458 } 459 460 // Check DeleteCellsInFamily 461 if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil { 462 t.Fatalf("Creating column family: %v", err) 463 } 464 465 mut = NewMutation() 466 mut.Set("status", "start", 2000, []byte("2")) 467 mut.Set("status", "end", 3000, []byte("3")) 468 mut.Set("ts", "col", 1000, []byte("1")) 469 if err := table.Apply(ctx, "row1", mut); err != nil { 470 t.Fatalf("Mutating row: %v", err) 471 } 472 if err := table.Apply(ctx, "row2", mut); err != nil { 473 t.Fatalf("Mutating row: %v", err) 474 } 475 476 mut = NewMutation() 477 mut.DeleteCellsInFamily("status") 478 if err := table.Apply(ctx, "row1", mut); err != nil { 479 t.Fatalf("Delete cf: %v", err) 480 } 481 482 // ColumnFamily removed 483 r, err = table.ReadRow(ctx, "row1") 484 if err != nil { 485 t.Fatalf("Reading row: %v", err) 486 } 487 wantRow = Row{"ts": []ReadItem{ 488 {Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 489 }} 490 if !testutil.Equal(r, wantRow) { 491 t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow) 492 } 493 494 // ColumnFamily not removed 495 r, err = table.ReadRow(ctx, "row2") 496 if err != nil { 497 t.Fatalf("Reading row: %v", err) 498 } 499 wantRow = Row{ 500 "ts": []ReadItem{ 501 {Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 502 }, 503 "status": []ReadItem{ 504 {Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")}, 505 {Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 506 }, 507 } 508 if !testutil.Equal(r, wantRow) { 509 t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow) 510 } 511 512 // Check DeleteCellsInColumn 513 mut = NewMutation() 514 mut.Set("status", "start", 2000, []byte("2")) 515 mut.Set("status", "middle", 3000, []byte("3")) 516 mut.Set("status", "end", 1000, []byte("1")) 517 if err := table.Apply(ctx, "row3", mut); err != nil { 518 t.Fatalf("Mutating row: %v", err) 519 } 520 mut = NewMutation() 521 mut.DeleteCellsInColumn("status", "middle") 522 if err := table.Apply(ctx, "row3", mut); err != nil { 523 t.Fatalf("Delete column: %v", err) 524 } 525 r, err = table.ReadRow(ctx, "row3") 526 if err != nil { 527 t.Fatalf("Reading row: %v", err) 528 } 529 wantRow = Row{ 530 "status": []ReadItem{ 531 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 532 {Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 533 }, 534 } 535 if !testutil.Equal(r, wantRow) { 536 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 537 } 538 mut = NewMutation() 539 mut.DeleteCellsInColumn("status", "start") 540 if err := table.Apply(ctx, "row3", mut); err != nil { 541 t.Fatalf("Delete column: %v", err) 542 } 543 r, err = table.ReadRow(ctx, "row3") 544 if err != nil { 545 t.Fatalf("Reading row: %v", err) 546 } 547 wantRow = Row{ 548 "status": []ReadItem{ 549 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 550 }, 551 } 552 if !testutil.Equal(r, wantRow) { 553 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 554 } 555 mut = NewMutation() 556 mut.DeleteCellsInColumn("status", "end") 557 if err := table.Apply(ctx, "row3", mut); err != nil { 558 t.Fatalf("Delete column: %v", err) 559 } 560 r, err = table.ReadRow(ctx, "row3") 561 if err != nil { 562 t.Fatalf("Reading row: %v", err) 563 } 564 if len(r) != 0 { 565 t.Fatalf("Delete column: got %v, want empty row", r) 566 } 567 // Add same cell after delete 568 mut = NewMutation() 569 mut.Set("status", "end", 1000, []byte("1")) 570 if err := table.Apply(ctx, "row3", mut); err != nil { 571 t.Fatalf("Mutating row: %v", err) 572 } 573 r, err = table.ReadRow(ctx, "row3") 574 if err != nil { 575 t.Fatalf("Reading row: %v", err) 576 } 577 if !testutil.Equal(r, wantRow) { 578 t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow) 579 } 580} 581 582func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { 583 ctx := context.Background() 584 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 585 if err != nil { 586 t.Fatal(err) 587 } 588 defer cleanup() 589 590 if err := populatePresidentsGraph(table); err != nil { 591 t.Fatal(err) 592 } 593 594 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 595 t.Fatalf("Creating column family: %v", err) 596 } 597 598 // Do highly concurrent reads/writes. 599 const maxConcurrency = 1000 600 var wg sync.WaitGroup 601 for i := 0; i < maxConcurrency; i++ { 602 wg.Add(1) 603 go func() { 604 defer wg.Done() 605 switch r := rand.Intn(100); { // r ∈ [0,100) 606 case 0 <= r && r < 30: 607 // Do a read. 608 _, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1))) 609 if err != nil { 610 t.Errorf("Concurrent read: %v", err) 611 } 612 case 30 <= r && r < 100: 613 // Do a write. 614 mut := NewMutation() 615 mut.Set("ts", "col", 1000, []byte("data")) 616 if err := table.Apply(ctx, "testrow", mut); err != nil { 617 t.Errorf("Concurrent write: %v", err) 618 } 619 } 620 }() 621 } 622 wg.Wait() 623} 624 625func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { 626 ctx := context.Background() 627 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 628 if err != nil { 629 t.Fatal(err) 630 } 631 defer cleanup() 632 633 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 634 t.Fatalf("Creating column family: %v", err) 635 } 636 637 bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set. 638 nonsense := []byte("lorem ipsum dolor sit amet, ") 639 fill(bigBytes, nonsense) 640 mut := NewMutation() 641 mut.Set("ts", "col", 1000, bigBytes) 642 if err := table.Apply(ctx, "bigrow", mut); err != nil { 643 t.Fatalf("Big write: %v", err) 644 } 645 r, err := table.ReadRow(ctx, "bigrow") 646 if err != nil { 647 t.Fatalf("Big read: %v", err) 648 } 649 wantRow := Row{"ts": []ReadItem{ 650 {Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes}, 651 }} 652 if !testutil.Equal(r, wantRow) { 653 t.Fatalf("Big read returned incorrect bytes: %v", r) 654 } 655 656 var wg sync.WaitGroup 657 // Now write 1000 rows, each with 82 KB values, then scan them all. 658 medBytes := make([]byte, 82<<10) 659 fill(medBytes, nonsense) 660 sem := make(chan int, 50) // do up to 50 mutations at a time. 661 for i := 0; i < 1000; i++ { 662 mut := NewMutation() 663 mut.Set("ts", "big-scan", 1000, medBytes) 664 row := fmt.Sprintf("row-%d", i) 665 wg.Add(1) 666 go func() { 667 defer wg.Done() 668 defer func() { <-sem }() 669 sem <- 1 670 if err := table.Apply(ctx, row, mut); err != nil { 671 t.Errorf("Preparing large scan: %v", err) 672 } 673 }() 674 } 675 wg.Wait() 676 n := 0 677 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 678 for _, ris := range r { 679 for _, ri := range ris { 680 n += len(ri.Value) 681 } 682 } 683 return true 684 }, RowFilter(ColumnFilter("big-scan"))) 685 if err != nil { 686 t.Fatalf("Doing large scan: %v", err) 687 } 688 if want := 1000 * len(medBytes); n != want { 689 t.Fatalf("Large scan returned %d bytes, want %d", n, want) 690 } 691 // Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption. 692 rc := 0 693 wantRc := 3 694 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 695 rc++ 696 return true 697 }, LimitRows(int64(wantRc))) 698 if err != nil { 699 t.Fatal(err) 700 } 701 if rc != wantRc { 702 t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) 703 } 704 705 // Test bulk mutations 706 if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil { 707 t.Fatalf("Creating column family: %v", err) 708 } 709 bulkData := map[string][]string{ 710 "red sox": {"2004", "2007", "2013"}, 711 "patriots": {"2001", "2003", "2004", "2014"}, 712 "celtics": {"1981", "1984", "1986", "2008"}, 713 } 714 var rowKeys []string 715 var muts []*Mutation 716 for row, ss := range bulkData { 717 mut := NewMutation() 718 for _, name := range ss { 719 mut.Set("bulk", name, 1000, []byte("1")) 720 } 721 rowKeys = append(rowKeys, row) 722 muts = append(muts, mut) 723 } 724 status, err := table.ApplyBulk(ctx, rowKeys, muts) 725 if err != nil { 726 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 727 } 728 if status != nil { 729 t.Fatalf("non-nil errors: %v", err) 730 } 731 732 // Read each row back 733 for rowKey, ss := range bulkData { 734 row, err := table.ReadRow(ctx, rowKey) 735 if err != nil { 736 t.Fatalf("Reading a bulk row: %v", err) 737 } 738 var wantItems []ReadItem 739 for _, val := range ss { 740 wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")}) 741 } 742 wantRow := Row{"bulk": wantItems} 743 if !testutil.Equal(row, wantRow) { 744 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 745 } 746 } 747 748 // Test bulk write errors. 749 // Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error. 750 badMut := NewMutation() 751 badMut.Set("badfamily", "col", ServerTime, nil) 752 badMut2 := NewMutation() 753 badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1")) 754 status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2}) 755 if err != nil { 756 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 757 } 758 if status == nil { 759 t.Fatalf("No errors for bad bulk mutation") 760 } else if status[0] == nil || status[1] == nil { 761 t.Fatalf("No error for bad bulk mutation") 762 } 763} 764 765func TestIntegration_Read(t *testing.T) { 766 ctx := context.Background() 767 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 768 if err != nil { 769 t.Fatal(err) 770 } 771 defer cleanup() 772 773 // Insert some data. 774 initialData := map[string][]string{ 775 "wmckinley": {"tjefferson"}, 776 "gwashington": {"jadams"}, 777 "tjefferson": {"gwashington", "jadams", "wmckinley"}, 778 "jadams": {"gwashington", "tjefferson"}, 779 } 780 for row, ss := range initialData { 781 mut := NewMutation() 782 for _, name := range ss { 783 mut.Set("follows", name, 1000, []byte("1")) 784 } 785 if err := table.Apply(ctx, row, mut); err != nil { 786 t.Fatalf("Mutating row %q: %v", row, err) 787 } 788 } 789 790 for _, test := range []struct { 791 desc string 792 rr RowSet 793 filter Filter // may be nil 794 limit ReadOption // may be nil 795 796 // We do the read, grab all the cells, turn them into "<row>-<col>-<val>", 797 // and join with a comma. 798 want string 799 wantLabels []string 800 }{ 801 { 802 desc: "read all, unfiltered", 803 rr: RowRange{}, 804 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 805 }, 806 { 807 desc: "read with InfiniteRange, unfiltered", 808 rr: InfiniteRange("tjefferson"), 809 want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 810 }, 811 { 812 desc: "read with NewRange, unfiltered", 813 rr: NewRange("gargamel", "hubbard"), 814 want: "gwashington-jadams-1", 815 }, 816 { 817 desc: "read with PrefixRange, unfiltered", 818 rr: PrefixRange("jad"), 819 want: "jadams-gwashington-1,jadams-tjefferson-1", 820 }, 821 { 822 desc: "read with SingleRow, unfiltered", 823 rr: SingleRow("wmckinley"), 824 want: "wmckinley-tjefferson-1", 825 }, 826 { 827 desc: "read all, with ColumnFilter", 828 rr: RowRange{}, 829 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 830 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 831 }, 832 { 833 desc: "read all, with ColumnFilter, prefix", 834 rr: RowRange{}, 835 filter: ColumnFilter("j"), // no matches 836 want: "", 837 }, 838 { 839 desc: "read range, with ColumnRangeFilter", 840 rr: RowRange{}, 841 filter: ColumnRangeFilter("follows", "h", "k"), 842 want: "gwashington-jadams-1,tjefferson-jadams-1", 843 }, 844 { 845 desc: "read range from empty, with ColumnRangeFilter", 846 rr: RowRange{}, 847 filter: ColumnRangeFilter("follows", "", "u"), 848 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 849 }, 850 { 851 desc: "read range from start to empty, with ColumnRangeFilter", 852 rr: RowRange{}, 853 filter: ColumnRangeFilter("follows", "h", ""), 854 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 855 }, 856 { 857 desc: "read with RowKeyFilter", 858 rr: RowRange{}, 859 filter: RowKeyFilter(".*wash.*"), 860 want: "gwashington-jadams-1", 861 }, 862 { 863 desc: "read with RowKeyFilter, prefix", 864 rr: RowRange{}, 865 filter: RowKeyFilter("gwash"), 866 want: "", 867 }, 868 { 869 desc: "read with RowKeyFilter, no matches", 870 rr: RowRange{}, 871 filter: RowKeyFilter(".*xxx.*"), 872 want: "", 873 }, 874 { 875 desc: "read with FamilyFilter, no matches", 876 rr: RowRange{}, 877 filter: FamilyFilter(".*xxx.*"), 878 want: "", 879 }, 880 { 881 desc: "read with ColumnFilter + row limit", 882 rr: RowRange{}, 883 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 884 limit: LimitRows(2), 885 want: "gwashington-jadams-1,jadams-tjefferson-1", 886 }, 887 { 888 desc: "apply labels to the result rows", 889 rr: RowRange{}, 890 filter: LabelFilter("test-label"), 891 limit: LimitRows(2), 892 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1", 893 wantLabels: []string{"test-label", "test-label", "test-label"}, 894 }, 895 { 896 desc: "read all, strip values", 897 rr: RowRange{}, 898 filter: StripValueFilter(), 899 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 900 }, 901 { 902 desc: "read with ColumnFilter + row limit + strip values", 903 rr: RowRange{}, 904 filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson" 905 limit: LimitRows(2), 906 want: "gwashington-jadams-,jadams-tjefferson-", 907 }, 908 { 909 desc: "read with condition, strip values on true", 910 rr: RowRange{}, 911 filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil), 912 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 913 }, 914 { 915 desc: "read with condition, strip values on false", 916 rr: RowRange{}, 917 filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()), 918 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 919 }, 920 { 921 desc: "read with ValueRangeFilter + row limit", 922 rr: RowRange{}, 923 filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1" 924 limit: LimitRows(2), 925 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1", 926 }, 927 { 928 desc: "read with ValueRangeFilter, no match on exclusive end", 929 rr: RowRange{}, 930 filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match 931 want: "", 932 }, 933 { 934 desc: "read with ValueRangeFilter, no matches", 935 rr: RowRange{}, 936 filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing 937 want: "", 938 }, 939 { 940 desc: "read with InterleaveFilter, no matches on all filters", 941 rr: RowRange{}, 942 filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")), 943 want: "", 944 }, 945 { 946 desc: "read with InterleaveFilter, no duplicate cells", 947 rr: RowRange{}, 948 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")), 949 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 950 }, 951 { 952 desc: "read with InterleaveFilter, with duplicate cells", 953 rr: RowRange{}, 954 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")), 955 want: "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1", 956 }, 957 { 958 desc: "read with a RowRangeList and no filter", 959 rr: RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")}, 960 want: "gwashington-jadams-1,wmckinley-tjefferson-1", 961 }, 962 { 963 desc: "chain that excludes rows and matches nothing, in a condition", 964 rr: RowRange{}, 965 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil), 966 want: "", 967 }, 968 { 969 desc: "chain that ends with an interleave that has no match. covers #804", 970 rr: RowRange{}, 971 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil), 972 want: "", 973 }, 974 } { 975 t.Run(test.desc, func(t *testing.T) { 976 var opts []ReadOption 977 if test.filter != nil { 978 opts = append(opts, RowFilter(test.filter)) 979 } 980 if test.limit != nil { 981 opts = append(opts, test.limit) 982 } 983 var elt, labels []string 984 err := table.ReadRows(ctx, test.rr, func(r Row) bool { 985 for _, ris := range r { 986 for _, ri := range ris { 987 labels = append(labels, ri.Labels...) 988 elt = append(elt, formatReadItem(ri)) 989 } 990 } 991 return true 992 }, opts...) 993 if err != nil { 994 t.Fatal(err) 995 } 996 if got := strings.Join(elt, ","); got != test.want { 997 t.Fatalf("got %q\nwant %q", got, test.want) 998 } 999 if got, want := labels, test.wantLabels; !reflect.DeepEqual(got, want) { 1000 t.Fatalf("got %q\nwant %q", got, want) 1001 } 1002 }) 1003 } 1004} 1005 1006func TestIntegration_SampleRowKeys(t *testing.T) { 1007 ctx := context.Background() 1008 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 1009 if err != nil { 1010 t.Fatal(err) 1011 } 1012 defer cleanup() 1013 1014 // Insert some data. 1015 initialData := map[string][]string{ 1016 "wmckinley11": {"tjefferson11"}, 1017 "gwashington77": {"jadams77"}, 1018 "tjefferson0": {"gwashington0", "jadams0"}, 1019 } 1020 1021 for row, ss := range initialData { 1022 mut := NewMutation() 1023 for _, name := range ss { 1024 mut.Set("follows", name, 1000, []byte("1")) 1025 } 1026 if err := table.Apply(ctx, row, mut); err != nil { 1027 t.Fatalf("Mutating row %q: %v", row, err) 1028 } 1029 } 1030 sampleKeys, err := table.SampleRowKeys(context.Background()) 1031 if err != nil { 1032 t.Fatalf("%s: %v", "SampleRowKeys:", err) 1033 } 1034 if len(sampleKeys) == 0 { 1035 t.Error("SampleRowKeys length 0") 1036 } 1037} 1038 1039func TestIntegration_Admin(t *testing.T) { 1040 testEnv, err := NewIntegrationEnv() 1041 if err != nil { 1042 t.Fatalf("IntegrationEnv: %v", err) 1043 } 1044 defer testEnv.Close() 1045 1046 timeout := 2 * time.Second 1047 if testEnv.Config().UseProd { 1048 timeout = 5 * time.Minute 1049 } 1050 ctx, _ := context.WithTimeout(context.Background(), timeout) 1051 1052 adminClient, err := testEnv.NewAdminClient() 1053 if err != nil { 1054 t.Fatalf("NewAdminClient: %v", err) 1055 } 1056 defer adminClient.Close() 1057 1058 iAdminClient, err := testEnv.NewInstanceAdminClient() 1059 if err != nil { 1060 t.Fatalf("NewInstanceAdminClient: %v", err) 1061 } 1062 if iAdminClient != nil { 1063 defer iAdminClient.Close() 1064 1065 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1066 if err != nil { 1067 t.Errorf("InstanceInfo: %v", err) 1068 } 1069 if iInfo.Name != adminClient.instance { 1070 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1071 } 1072 } 1073 1074 list := func() []string { 1075 tbls, err := adminClient.Tables(ctx) 1076 if err != nil { 1077 t.Fatalf("Fetching list of tables: %v", err) 1078 } 1079 sort.Strings(tbls) 1080 return tbls 1081 } 1082 containsAll := func(got, want []string) bool { 1083 gotSet := make(map[string]bool) 1084 1085 for _, s := range got { 1086 gotSet[s] = true 1087 } 1088 for _, s := range want { 1089 if !gotSet[s] { 1090 return false 1091 } 1092 } 1093 return true 1094 } 1095 1096 defer deleteTable(ctx, t, adminClient, "mytable") 1097 1098 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1099 t.Fatalf("Creating table: %v", err) 1100 } 1101 1102 defer deleteTable(ctx, t, adminClient, "myothertable") 1103 1104 if err := adminClient.CreateTable(ctx, "myothertable"); err != nil { 1105 t.Fatalf("Creating table: %v", err) 1106 } 1107 1108 if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) { 1109 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1110 } 1111 1112 must(adminClient.WaitForReplication(ctx, "mytable")) 1113 1114 if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil { 1115 t.Fatalf("Deleting table: %v", err) 1116 } 1117 tables := list() 1118 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1119 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1120 } 1121 if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) { 1122 t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted) 1123 } 1124 1125 tblConf := TableConf{ 1126 TableID: "conftable", 1127 Families: map[string]GCPolicy{ 1128 "fam1": MaxVersionsPolicy(1), 1129 "fam2": MaxVersionsPolicy(2), 1130 }, 1131 } 1132 if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil { 1133 t.Fatalf("Creating table from TableConf: %v", err) 1134 } 1135 defer deleteTable(ctx, t, adminClient, tblConf.TableID) 1136 1137 tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID) 1138 if err != nil { 1139 t.Fatalf("Getting table info: %v", err) 1140 } 1141 sort.Strings(tblInfo.Families) 1142 wantFams := []string{"fam1", "fam2"} 1143 if !testutil.Equal(tblInfo.Families, wantFams) { 1144 t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams) 1145 } 1146 1147 // Populate mytable and drop row ranges 1148 if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil { 1149 t.Fatalf("Creating column family: %v", err) 1150 } 1151 1152 client, err := testEnv.NewClient() 1153 if err != nil { 1154 t.Fatalf("NewClient: %v", err) 1155 } 1156 defer client.Close() 1157 1158 tbl := client.Open("mytable") 1159 1160 prefixes := []string{"a", "b", "c"} 1161 for _, prefix := range prefixes { 1162 for i := 0; i < 5; i++ { 1163 mut := NewMutation() 1164 mut.Set("cf", "col", 1000, []byte("1")) 1165 if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil { 1166 t.Fatalf("Mutating row: %v", err) 1167 } 1168 } 1169 } 1170 1171 if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil { 1172 t.Errorf("DropRowRange a: %v", err) 1173 } 1174 if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil { 1175 t.Errorf("DropRowRange c: %v", err) 1176 } 1177 if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil { 1178 t.Errorf("DropRowRange x: %v", err) 1179 } 1180 1181 var gotRowCount int 1182 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1183 gotRowCount++ 1184 if !strings.HasPrefix(row.Key(), "b") { 1185 t.Errorf("Invalid row after dropping range: %v", row) 1186 } 1187 return true 1188 })) 1189 if gotRowCount != 5 { 1190 t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5) 1191 } 1192 1193 if err = adminClient.DropAllRows(ctx, "mytable"); err != nil { 1194 t.Errorf("DropAllRows mytable: %v", err) 1195 } 1196 1197 gotRowCount = 0 1198 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1199 gotRowCount++ 1200 return true 1201 })) 1202 if gotRowCount != 0 { 1203 t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0) 1204 } 1205} 1206 1207func TestIntegration_TableIam(t *testing.T) { 1208 testEnv, err := NewIntegrationEnv() 1209 if err != nil { 1210 t.Fatalf("IntegrationEnv: %v", err) 1211 } 1212 defer testEnv.Close() 1213 1214 if !testEnv.Config().UseProd { 1215 t.Skip("emulator doesn't support IAM Policy creation") 1216 } 1217 1218 timeout := 5 * time.Minute 1219 ctx, _ := context.WithTimeout(context.Background(), timeout) 1220 1221 adminClient, err := testEnv.NewAdminClient() 1222 if err != nil { 1223 t.Fatalf("NewAdminClient: %v", err) 1224 } 1225 defer adminClient.Close() 1226 1227 defer deleteTable(ctx, t, adminClient, "mytable") 1228 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1229 t.Fatalf("Creating table: %v", err) 1230 } 1231 1232 // Verify that the IAM Controls work for Tables. 1233 iamHandle := adminClient.TableIAM("mytable") 1234 p, err := iamHandle.Policy(ctx) 1235 if err != nil { 1236 t.Errorf("Iam GetPolicy mytable: %v", err) 1237 } 1238 if err = iamHandle.SetPolicy(ctx, p); err != nil { 1239 t.Errorf("Iam SetPolicy mytable: %v", err) 1240 } 1241 if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil { 1242 t.Errorf("Iam TestPermissions mytable: %v", err) 1243 } 1244} 1245 1246func TestIntegration_AdminCreateInstance(t *testing.T) { 1247 if instanceToCreate == "" { 1248 t.Skip("instanceToCreate not set, skipping instance creation testing") 1249 } 1250 1251 testEnv, err := NewIntegrationEnv() 1252 if err != nil { 1253 t.Fatalf("IntegrationEnv: %v", err) 1254 } 1255 defer testEnv.Close() 1256 1257 if !testEnv.Config().UseProd { 1258 t.Skip("emulator doesn't support instance creation") 1259 } 1260 1261 timeout := 5 * time.Minute 1262 ctx, _ := context.WithTimeout(context.Background(), timeout) 1263 1264 iAdminClient, err := testEnv.NewInstanceAdminClient() 1265 if err != nil { 1266 t.Fatalf("NewInstanceAdminClient: %v", err) 1267 } 1268 defer iAdminClient.Close() 1269 1270 clusterID := instanceToCreate + "-cluster" 1271 1272 // Create a development instance 1273 conf := &InstanceConf{ 1274 InstanceId: instanceToCreate, 1275 ClusterId: clusterID, 1276 DisplayName: "test instance", 1277 Zone: instanceToCreateZone, 1278 InstanceType: DEVELOPMENT, 1279 Labels: map[string]string{"test-label-key": "test-label-value"}, 1280 } 1281 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1282 t.Fatalf("CreateInstance: %v", err) 1283 } 1284 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1285 1286 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1287 if err != nil { 1288 t.Fatalf("InstanceInfo: %v", err) 1289 } 1290 1291 // Basic return values are tested elsewhere, check instance type 1292 if iInfo.InstanceType != DEVELOPMENT { 1293 t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType) 1294 } 1295 if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) { 1296 t.Fatalf("Labels: %v, want: %v", got, want) 1297 } 1298 1299 // Update everything we can about the instance in one call. 1300 confWithClusters := &InstanceWithClustersConfig{ 1301 InstanceID: instanceToCreate, 1302 DisplayName: "new display name", 1303 InstanceType: PRODUCTION, 1304 Labels: map[string]string{"new-label-key": "new-label-value"}, 1305 Clusters: []ClusterConfig{ 1306 {ClusterID: clusterID, NumNodes: 5}}, 1307 } 1308 1309 if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { 1310 t.Fatalf("UpdateInstanceWithClusters: %v", err) 1311 } 1312 1313 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1314 if err != nil { 1315 t.Fatalf("InstanceInfo: %v", err) 1316 } 1317 1318 if iInfo.InstanceType != PRODUCTION { 1319 t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType) 1320 } 1321 if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) { 1322 t.Fatalf("Labels: %v, want: %v", got, want) 1323 } 1324 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1325 t.Fatalf("Display name: %q, want: %q", got, want) 1326 } 1327 1328 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1329 if err != nil { 1330 t.Fatalf("GetCluster: %v", err) 1331 } 1332 1333 if cInfo.ServeNodes != 5 { 1334 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1335 } 1336} 1337 1338func TestIntegration_AdminUpdateInstanceLabels(t *testing.T) { 1339 1340 // Check the environments 1341 if instanceToCreate == "" { 1342 t.Skip("instanceToCreate not set, skipping instance creation testing") 1343 } 1344 testEnv, err := NewIntegrationEnv() 1345 if err != nil { 1346 t.Fatalf("IntegrationEnv: %v", err) 1347 } 1348 defer testEnv.Close() 1349 if !testEnv.Config().UseProd { 1350 t.Skip("emulator doesn't support instance creation") 1351 } 1352 1353 // Create an instance admin client 1354 timeout := 5 * time.Minute 1355 ctx, _ := context.WithTimeout(context.Background(), timeout) 1356 iAdminClient, err := testEnv.NewInstanceAdminClient() 1357 if err != nil { 1358 t.Fatalf("NewInstanceAdminClient: %v", err) 1359 } 1360 defer iAdminClient.Close() 1361 1362 // Create a test instance 1363 conf := &InstanceConf{ 1364 InstanceId: instanceToCreate, 1365 ClusterId: instanceToCreate + "-cluster", 1366 DisplayName: "test instance", 1367 InstanceType: DEVELOPMENT, 1368 Zone: instanceToCreateZone, 1369 } 1370 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1371 t.Fatalf("CreateInstance: %v", err) 1372 } 1373 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1374 1375 // Check the created test instances 1376 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1377 if err != nil { 1378 t.Fatalf("InstanceInfo: %v", err) 1379 } 1380 if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) { 1381 t.Fatalf("Labels: %v, want: %v", got, want) 1382 } 1383 1384 // Test patterns to update instance labels 1385 tests := []struct { 1386 name string 1387 in map[string]string 1388 out map[string]string 1389 }{ 1390 { 1391 name: "update labels", 1392 in: map[string]string{"test-label-key": "test-label-value"}, 1393 out: map[string]string{"test-label-key": "test-label-value"}, 1394 }, 1395 { 1396 name: "update multiple labels", 1397 in: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"}, 1398 out: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"}, 1399 }, 1400 { 1401 name: "not update existing labels", 1402 in: nil, // nil map 1403 out: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"}, 1404 }, 1405 { 1406 name: "delete labels", 1407 in: map[string]string{}, // empty map 1408 out: nil, 1409 }, 1410 } 1411 for _, tt := range tests { 1412 t.Run(tt.name, func(t *testing.T) { 1413 confWithClusters := &InstanceWithClustersConfig{ 1414 InstanceID: instanceToCreate, 1415 Labels: tt.in, 1416 } 1417 if err := iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { 1418 t.Fatalf("UpdateInstanceWithClusters: %v", err) 1419 } 1420 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1421 if err != nil { 1422 t.Fatalf("InstanceInfo: %v", err) 1423 } 1424 if got, want := iInfo.Labels, tt.out; !cmp.Equal(got, want) { 1425 t.Fatalf("Labels: %v, want: %v", got, want) 1426 } 1427 }) 1428 } 1429} 1430 1431func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) { 1432 if instanceToCreate == "" { 1433 t.Skip("instanceToCreate not set, skipping instance update testing") 1434 } 1435 1436 testEnv, err := NewIntegrationEnv() 1437 if err != nil { 1438 t.Fatalf("IntegrationEnv: %v", err) 1439 } 1440 defer testEnv.Close() 1441 1442 if !testEnv.Config().UseProd { 1443 t.Skip("emulator doesn't support instance creation") 1444 } 1445 1446 timeout := 5 * time.Minute 1447 ctx, _ := context.WithTimeout(context.Background(), timeout) 1448 1449 iAdminClient, err := testEnv.NewInstanceAdminClient() 1450 if err != nil { 1451 t.Fatalf("NewInstanceAdminClient: %v", err) 1452 } 1453 defer iAdminClient.Close() 1454 1455 clusterID := instanceToCreate + "-cluster" 1456 1457 // Create a development instance 1458 conf := &InstanceConf{ 1459 InstanceId: instanceToCreate, 1460 ClusterId: clusterID, 1461 DisplayName: "test instance", 1462 Zone: instanceToCreateZone, 1463 InstanceType: DEVELOPMENT, 1464 Labels: map[string]string{"test-label-key": "test-label-value"}, 1465 } 1466 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1467 t.Fatalf("CreateInstance: %v", err) 1468 } 1469 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1470 1471 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1472 if err != nil { 1473 t.Fatalf("InstanceInfo: %v", err) 1474 } 1475 1476 // Basic return values are tested elsewhere, check instance type 1477 if iInfo.InstanceType != DEVELOPMENT { 1478 t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType) 1479 } 1480 if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) { 1481 t.Fatalf("Labels: %v, want: %v", got, want) 1482 } 1483 1484 // Update everything we can about the instance in one call. 1485 confWithClusters := &InstanceWithClustersConfig{ 1486 InstanceID: instanceToCreate, 1487 DisplayName: "new display name", 1488 InstanceType: PRODUCTION, 1489 Labels: map[string]string{"new-label-key": "new-label-value"}, 1490 Clusters: []ClusterConfig{ 1491 {ClusterID: clusterID, NumNodes: 5}}, 1492 } 1493 1494 results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1495 if err != nil { 1496 t.Fatalf("UpdateInstanceAndSyncClusters: %v", err) 1497 } 1498 1499 wantResults := UpdateInstanceResults{ 1500 InstanceUpdated: true, 1501 UpdatedClusters: []string{clusterID}, 1502 } 1503 if diff := testutil.Diff(*results, wantResults); diff != "" { 1504 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1505 } 1506 1507 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1508 if err != nil { 1509 t.Fatalf("InstanceInfo: %v", err) 1510 } 1511 1512 if iInfo.InstanceType != PRODUCTION { 1513 t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType) 1514 } 1515 if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) { 1516 t.Fatalf("Labels: %v, want: %v", got, want) 1517 } 1518 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1519 t.Fatalf("Display name: %q, want: %q", got, want) 1520 } 1521 1522 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1523 if err != nil { 1524 t.Fatalf("GetCluster: %v", err) 1525 } 1526 1527 if cInfo.ServeNodes != 5 { 1528 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1529 } 1530 1531 // Now add a second cluster as the only change. The first cluster must also be provided so 1532 // it is not removed. 1533 clusterID2 := clusterID + "-2" 1534 confWithClusters = &InstanceWithClustersConfig{ 1535 InstanceID: instanceToCreate, 1536 Clusters: []ClusterConfig{ 1537 {ClusterID: clusterID}, 1538 {ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2}}, 1539 } 1540 1541 results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1542 if err != nil { 1543 t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err) 1544 } 1545 1546 wantResults = UpdateInstanceResults{ 1547 InstanceUpdated: false, 1548 CreatedClusters: []string{clusterID2}, 1549 } 1550 if diff := testutil.Diff(*results, wantResults); diff != "" { 1551 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1552 } 1553 1554 // Now update one cluster and delete the other 1555 confWithClusters = &InstanceWithClustersConfig{ 1556 InstanceID: instanceToCreate, 1557 Clusters: []ClusterConfig{ 1558 {ClusterID: clusterID, NumNodes: 4}}, 1559 } 1560 1561 results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1562 if err != nil { 1563 t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err) 1564 } 1565 1566 wantResults = UpdateInstanceResults{ 1567 InstanceUpdated: false, 1568 UpdatedClusters: []string{clusterID}, 1569 DeletedClusters: []string{clusterID2}, 1570 } 1571 1572 if diff := testutil.Diff(*results, wantResults); diff != "" { 1573 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1574 } 1575 1576 // Make sure the instance looks as we would expect 1577 clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId) 1578 if err != nil { 1579 t.Fatalf("Clusters: %v", err) 1580 } 1581 1582 if len(clusters) != 1 { 1583 t.Fatalf("Clusters length %v, want: 1", len(clusters)) 1584 } 1585 1586 wantCluster := &ClusterInfo{ 1587 Name: clusterID, 1588 Zone: instanceToCreateZone, 1589 ServeNodes: 4, 1590 State: "READY", 1591 } 1592 if diff := testutil.Diff(clusters[0], wantCluster); diff != "" { 1593 t.Fatalf("InstanceEquality: got - want +\n%s", diff) 1594 } 1595} 1596 1597func TestIntegration_AdminSnapshot(t *testing.T) { 1598 testEnv, err := NewIntegrationEnv() 1599 if err != nil { 1600 t.Fatalf("IntegrationEnv: %v", err) 1601 } 1602 defer testEnv.Close() 1603 1604 if !testEnv.Config().UseProd { 1605 t.Skip("emulator doesn't support snapshots") 1606 } 1607 1608 timeout := 2 * time.Second 1609 if testEnv.Config().UseProd { 1610 timeout = 5 * time.Minute 1611 } 1612 ctx, _ := context.WithTimeout(context.Background(), timeout) 1613 1614 adminClient, err := testEnv.NewAdminClient() 1615 if err != nil { 1616 t.Fatalf("NewAdminClient: %v", err) 1617 } 1618 defer adminClient.Close() 1619 1620 table := testEnv.Config().Table 1621 cluster := testEnv.Config().Cluster 1622 1623 list := func(cluster string) ([]*SnapshotInfo, error) { 1624 infos := []*SnapshotInfo(nil) 1625 1626 it := adminClient.Snapshots(ctx, cluster) 1627 for { 1628 s, err := it.Next() 1629 if err == iterator.Done { 1630 break 1631 } 1632 if err != nil { 1633 return nil, err 1634 } 1635 infos = append(infos, s) 1636 } 1637 return infos, err 1638 } 1639 1640 // Delete the table at the end of the test. Schedule ahead of time 1641 // in case the client fails 1642 defer deleteTable(ctx, t, adminClient, table) 1643 1644 if err := adminClient.CreateTable(ctx, table); err != nil { 1645 t.Fatalf("Creating table: %v", err) 1646 } 1647 1648 // Precondition: no snapshots 1649 snapshots, err := list(cluster) 1650 if err != nil { 1651 t.Fatalf("Initial snapshot list: %v", err) 1652 } 1653 if got, want := len(snapshots), 0; got != want { 1654 t.Fatalf("Initial snapshot list len: %d, want: %d", got, want) 1655 } 1656 1657 // Create snapshot 1658 defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot") 1659 1660 if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil { 1661 t.Fatalf("Creating snaphot: %v", err) 1662 } 1663 1664 // List snapshot 1665 snapshots, err = list(cluster) 1666 if err != nil { 1667 t.Fatalf("Listing snapshots: %v", err) 1668 } 1669 if got, want := len(snapshots), 1; got != want { 1670 t.Fatalf("Listing snapshot count: %d, want: %d", got, want) 1671 } 1672 if got, want := snapshots[0].Name, "mysnapshot"; got != want { 1673 t.Fatalf("Snapshot name: %s, want: %s", got, want) 1674 } 1675 if got, want := snapshots[0].SourceTable, table; got != want { 1676 t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want) 1677 } 1678 if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { 1679 t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want) 1680 } 1681 1682 // Get snapshot 1683 snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot") 1684 if err != nil { 1685 t.Fatalf("SnapshotInfo: %v", snapshot) 1686 } 1687 if got, want := *snapshot, *snapshots[0]; got != want { 1688 t.Fatalf("SnapshotInfo: %v, want: %v", got, want) 1689 } 1690 1691 // Restore 1692 restoredTable := table + "-restored" 1693 defer deleteTable(ctx, t, adminClient, restoredTable) 1694 if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil { 1695 t.Fatalf("CreateTableFromSnapshot: %v", err) 1696 } 1697 if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { 1698 t.Fatalf("Restored TableInfo: %v", err) 1699 } 1700 1701 // Delete snapshot 1702 if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil { 1703 t.Fatalf("DeleteSnapshot: %v", err) 1704 } 1705 snapshots, err = list(cluster) 1706 if err != nil { 1707 t.Fatalf("List after Delete: %v", err) 1708 } 1709 if got, want := len(snapshots), 0; got != want { 1710 t.Fatalf("List after delete len: %d, want: %d", got, want) 1711 } 1712} 1713 1714func TestIntegration_Granularity(t *testing.T) { 1715 testEnv, err := NewIntegrationEnv() 1716 if err != nil { 1717 t.Fatalf("IntegrationEnv: %v", err) 1718 } 1719 defer testEnv.Close() 1720 1721 timeout := 2 * time.Second 1722 if testEnv.Config().UseProd { 1723 timeout = 5 * time.Minute 1724 } 1725 ctx, _ := context.WithTimeout(context.Background(), timeout) 1726 1727 adminClient, err := testEnv.NewAdminClient() 1728 if err != nil { 1729 t.Fatalf("NewAdminClient: %v", err) 1730 } 1731 defer adminClient.Close() 1732 1733 list := func() []string { 1734 tbls, err := adminClient.Tables(ctx) 1735 if err != nil { 1736 t.Fatalf("Fetching list of tables: %v", err) 1737 } 1738 sort.Strings(tbls) 1739 return tbls 1740 } 1741 containsAll := func(got, want []string) bool { 1742 gotSet := make(map[string]bool) 1743 1744 for _, s := range got { 1745 gotSet[s] = true 1746 } 1747 for _, s := range want { 1748 if !gotSet[s] { 1749 return false 1750 } 1751 } 1752 return true 1753 } 1754 1755 defer deleteTable(ctx, t, adminClient, "mytable") 1756 1757 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1758 t.Fatalf("Creating table: %v", err) 1759 } 1760 1761 tables := list() 1762 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1763 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1764 } 1765 1766 // calling ModifyColumnFamilies to check the granularity of table 1767 prefix := adminClient.instancePrefix() 1768 req := &btapb.ModifyColumnFamiliesRequest{ 1769 Name: prefix + "/tables/" + "mytable", 1770 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 1771 Id: "cf", 1772 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}}, 1773 }}, 1774 } 1775 table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req) 1776 if err != nil { 1777 t.Fatalf("Creating column family: %v", err) 1778 } 1779 if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) { 1780 t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS)) 1781 } 1782} 1783 1784func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) { 1785 testEnv, err := NewIntegrationEnv() 1786 if err != nil { 1787 t.Fatalf("IntegrationEnv: %v", err) 1788 } 1789 defer testEnv.Close() 1790 1791 timeout := 2 * time.Second 1792 if testEnv.Config().UseProd { 1793 timeout = 5 * time.Minute 1794 } 1795 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1796 defer cancel() 1797 1798 adminClient, err := testEnv.NewAdminClient() 1799 if err != nil { 1800 t.Fatalf("NewAdminClient: %v", err) 1801 } 1802 defer adminClient.Close() 1803 1804 iAdminClient, err := testEnv.NewInstanceAdminClient() 1805 if err != nil { 1806 t.Fatalf("NewInstanceAdminClient: %v", err) 1807 } 1808 1809 if iAdminClient == nil { 1810 return 1811 } 1812 1813 defer iAdminClient.Close() 1814 profile := ProfileConf{ 1815 ProfileID: "app_profile1", 1816 InstanceID: adminClient.instance, 1817 ClusterID: testEnv.Config().Cluster, 1818 Description: "creating new app profile 1", 1819 RoutingPolicy: SingleClusterRouting, 1820 } 1821 1822 createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile) 1823 if err != nil { 1824 t.Fatalf("Creating app profile: %v", err) 1825 1826 } 1827 1828 gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 1829 1830 if err != nil { 1831 t.Fatalf("Get app profile: %v", err) 1832 } 1833 1834 if !proto.Equal(createdProfile, gotProfile) { 1835 t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name) 1836 1837 } 1838 1839 list := func(instanceID string) ([]*btapb.AppProfile, error) { 1840 profiles := []*btapb.AppProfile(nil) 1841 1842 it := iAdminClient.ListAppProfiles(ctx, instanceID) 1843 for { 1844 s, err := it.Next() 1845 if err == iterator.Done { 1846 break 1847 } 1848 if err != nil { 1849 return nil, err 1850 } 1851 profiles = append(profiles, s) 1852 } 1853 return profiles, err 1854 } 1855 1856 profiles, err := list(adminClient.instance) 1857 if err != nil { 1858 t.Fatalf("List app profile: %v", err) 1859 } 1860 1861 if got, want := len(profiles), 1; got != want { 1862 t.Fatalf("Initial app profile list len: %d, want: %d", got, want) 1863 } 1864 1865 for _, test := range []struct { 1866 desc string 1867 uattrs ProfileAttrsToUpdate 1868 want *btapb.AppProfile // nil means error 1869 }{ 1870 { 1871 desc: "empty update", 1872 uattrs: ProfileAttrsToUpdate{}, 1873 want: nil, 1874 }, 1875 1876 { 1877 desc: "empty description update", 1878 uattrs: ProfileAttrsToUpdate{Description: ""}, 1879 want: &btapb.AppProfile{ 1880 Name: gotProfile.Name, 1881 Description: "", 1882 RoutingPolicy: gotProfile.RoutingPolicy, 1883 Etag: gotProfile.Etag}, 1884 }, 1885 { 1886 desc: "routing update", 1887 uattrs: ProfileAttrsToUpdate{ 1888 RoutingPolicy: SingleClusterRouting, 1889 ClusterID: testEnv.Config().Cluster, 1890 }, 1891 want: &btapb.AppProfile{ 1892 Name: gotProfile.Name, 1893 Description: "", 1894 Etag: gotProfile.Etag, 1895 RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{ 1896 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1897 ClusterId: testEnv.Config().Cluster, 1898 }}, 1899 }, 1900 }, 1901 } { 1902 err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs) 1903 if err != nil { 1904 if test.want != nil { 1905 t.Errorf("%s: %v", test.desc, err) 1906 } 1907 continue 1908 } 1909 if err == nil && test.want == nil { 1910 t.Errorf("%s: got nil, want error", test.desc) 1911 continue 1912 } 1913 1914 got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 1915 1916 if !proto.Equal(got, test.want) { 1917 t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want) 1918 } 1919 1920 } 1921 1922 err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1") 1923 if err != nil { 1924 t.Fatalf("Delete app profile: %v", err) 1925 } 1926 1927} 1928 1929func TestIntegration_InstanceUpdate(t *testing.T) { 1930 testEnv, err := NewIntegrationEnv() 1931 if err != nil { 1932 t.Fatalf("IntegrationEnv: %v", err) 1933 } 1934 defer testEnv.Close() 1935 1936 timeout := 2 * time.Second 1937 if testEnv.Config().UseProd { 1938 timeout = 5 * time.Minute 1939 } 1940 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1941 defer cancel() 1942 1943 adminClient, err := testEnv.NewAdminClient() 1944 if err != nil { 1945 t.Fatalf("NewAdminClient: %v", err) 1946 } 1947 1948 defer adminClient.Close() 1949 1950 iAdminClient, err := testEnv.NewInstanceAdminClient() 1951 if err != nil { 1952 t.Fatalf("NewInstanceAdminClient: %v", err) 1953 } 1954 1955 if iAdminClient == nil { 1956 return 1957 } 1958 1959 defer iAdminClient.Close() 1960 1961 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1962 if err != nil { 1963 t.Errorf("InstanceInfo: %v", err) 1964 } 1965 1966 if iInfo.Name != adminClient.instance { 1967 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1968 } 1969 1970 if iInfo.DisplayName != adminClient.instance { 1971 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1972 } 1973 1974 const numNodes = 4 1975 // update cluster nodes 1976 if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil { 1977 t.Errorf("UpdateCluster: %v", err) 1978 } 1979 1980 // get cluster after updating 1981 cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster) 1982 if err != nil { 1983 t.Errorf("GetCluster %v", err) 1984 } 1985 if cis.ServeNodes != int(numNodes) { 1986 t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes)) 1987 } 1988} 1989 1990func TestIntegration_AdminBackup(t *testing.T) { 1991 testEnv, err := NewIntegrationEnv() 1992 if err != nil { 1993 t.Fatalf("IntegrationEnv: %v", err) 1994 } 1995 defer testEnv.Close() 1996 1997 if !testEnv.Config().UseProd { 1998 t.Skip("emulator doesn't support backups") 1999 } 2000 2001 timeout := 5 * time.Minute 2002 ctx, _ := context.WithTimeout(context.Background(), timeout) 2003 2004 adminClient, err := testEnv.NewAdminClient() 2005 if err != nil { 2006 t.Fatalf("NewAdminClient: %v", err) 2007 } 2008 defer adminClient.Close() 2009 2010 table := testEnv.Config().Table 2011 cluster := testEnv.Config().Cluster 2012 2013 // Delete the table at the end of the test. Schedule ahead of time 2014 // in case the client fails 2015 defer deleteTable(ctx, t, adminClient, table) 2016 2017 list := func(cluster string) ([]*BackupInfo, error) { 2018 infos := []*BackupInfo(nil) 2019 2020 it := adminClient.Backups(ctx, cluster) 2021 for { 2022 s, err := it.Next() 2023 if err == iterator.Done { 2024 break 2025 } 2026 if err != nil { 2027 return nil, err 2028 } 2029 infos = append(infos, s) 2030 } 2031 return infos, err 2032 } 2033 2034 if err := adminClient.CreateTable(ctx, table); err != nil { 2035 t.Fatalf("Creating table: %v", err) 2036 } 2037 2038 // Precondition: no backups 2039 backups, err := list(cluster) 2040 if err != nil { 2041 t.Fatalf("Initial backup list: %v", err) 2042 } 2043 if got, want := len(backups), 0; got != want { 2044 t.Fatalf("Initial backup list len: %d, want: %d", got, want) 2045 } 2046 2047 // Create backup 2048 defer adminClient.DeleteBackup(ctx, cluster, "mybackup") 2049 2050 if err = adminClient.CreateBackup(ctx, table, cluster, "mybackup", time.Now().Add(8*time.Hour)); err != nil { 2051 t.Fatalf("Creating backup: %v", err) 2052 } 2053 2054 // List backup 2055 backups, err = list(cluster) 2056 if err != nil { 2057 t.Fatalf("Listing backups: %v", err) 2058 } 2059 if got, want := len(backups), 1; got != want { 2060 t.Fatalf("Listing backup count: %d, want: %d", got, want) 2061 } 2062 if got, want := backups[0].Name, "mybackup"; got != want { 2063 t.Fatalf("Backup name: %s, want: %s", got, want) 2064 } 2065 if got, want := backups[0].SourceTable, table; got != want { 2066 t.Fatalf("Backup SourceTable: %s, want: %s", got, want) 2067 } 2068 if got, want := backups[0].ExpireTime, backups[0].StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { 2069 t.Fatalf("Backup ExpireTime: %s, want: %s", got, want) 2070 } 2071 2072 // Get backup 2073 backup, err := adminClient.BackupInfo(ctx, cluster, "mybackup") 2074 if err != nil { 2075 t.Fatalf("BackupInfo: %v", backup) 2076 } 2077 if got, want := *backup, *backups[0]; got != want { 2078 t.Fatalf("BackupInfo: %v, want: %v", got, want) 2079 } 2080 2081 // Update backup 2082 newExpireTime := time.Now().Add(10 * time.Hour) 2083 err = adminClient.UpdateBackup(ctx, cluster, "mybackup", newExpireTime) 2084 if err != nil { 2085 t.Fatalf("UpdateBackup failed: %v", err) 2086 } 2087 2088 // Check that updated backup has the correct expire time 2089 updatedBackup, err := adminClient.BackupInfo(ctx, cluster, "mybackup") 2090 if err != nil { 2091 t.Fatalf("BackupInfo: %v", err) 2092 } 2093 backup.ExpireTime = newExpireTime 2094 // Server clock and local clock may not be perfectly sync'ed. 2095 if got, want := *updatedBackup, *backup; got.ExpireTime.Sub(want.ExpireTime) > time.Minute { 2096 t.Fatalf("BackupInfo: %v, want: %v", got, want) 2097 } 2098 2099 // Restore backup 2100 restoredTable := table + "-restored" 2101 defer deleteTable(ctx, t, adminClient, restoredTable) 2102 if err = adminClient.RestoreTable(ctx, restoredTable, cluster, "mybackup"); err != nil { 2103 t.Fatalf("RestoreTable: %v", err) 2104 } 2105 if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { 2106 t.Fatalf("Restored TableInfo: %v", err) 2107 } 2108 2109 // Delete backup 2110 if err = adminClient.DeleteBackup(ctx, cluster, "mybackup"); err != nil { 2111 t.Fatalf("DeleteBackup: %v", err) 2112 } 2113 backups, err = list(cluster) 2114 if err != nil { 2115 t.Fatalf("List after Delete: %v", err) 2116 } 2117 if got, want := len(backups), 0; got != want { 2118 t.Fatalf("List after delete len: %d, want: %d", got, want) 2119 } 2120} 2121 2122func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { 2123 testEnv, err := NewIntegrationEnv() 2124 if err != nil { 2125 return nil, nil, nil, "", nil, err 2126 } 2127 2128 var timeout time.Duration 2129 if testEnv.Config().UseProd { 2130 timeout = 10 * time.Minute 2131 t.Logf("Running test against production") 2132 } else { 2133 timeout = 5 * time.Minute 2134 t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint) 2135 } 2136 ctx, cancel := context.WithTimeout(ctx, timeout) 2137 2138 client, err := testEnv.NewClient() 2139 if err != nil { 2140 return nil, nil, nil, "", nil, err 2141 } 2142 2143 adminClient, err := testEnv.NewAdminClient() 2144 if err != nil { 2145 return nil, nil, nil, "", nil, err 2146 } 2147 2148 if testEnv.Config().UseProd { 2149 // TODO: tables may not be successfully deleted in some cases, and will 2150 // become obsolete. We may need a way to automatically delete them. 2151 tableName = tableNameSpace.New() 2152 } else { 2153 tableName = testEnv.Config().Table 2154 } 2155 2156 if err := adminClient.CreateTable(ctx, tableName); err != nil { 2157 cancel() 2158 return nil, nil, nil, "", nil, err 2159 } 2160 if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil { 2161 cancel() 2162 return nil, nil, nil, "", nil, err 2163 } 2164 2165 return client, adminClient, client.Open(tableName), tableName, func() { 2166 if err := adminClient.DeleteTable(ctx, tableName); err != nil { 2167 t.Errorf("DeleteTable got error %v", err) 2168 } 2169 cancel() 2170 client.Close() 2171 adminClient.Close() 2172 }, nil 2173} 2174 2175func formatReadItem(ri ReadItem) string { 2176 // Use the column qualifier only to make the test data briefer. 2177 col := ri.Column[strings.Index(ri.Column, ":")+1:] 2178 return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value) 2179} 2180 2181func fill(b, sub []byte) { 2182 for len(b) > len(sub) { 2183 n := copy(b, sub) 2184 b = b[n:] 2185 } 2186} 2187 2188func clearTimestamps(r Row) { 2189 for _, ris := range r { 2190 for i := range ris { 2191 ris[i].Timestamp = 0 2192 } 2193 } 2194} 2195 2196func deleteTable(ctx context.Context, t *testing.T, ac *AdminClient, name string) { 2197 bo := gax.Backoff{ 2198 Initial: 100 * time.Millisecond, 2199 Max: 2 * time.Second, 2200 Multiplier: 1.2, 2201 } 2202 ctx, _ = context.WithTimeout(ctx, time.Second*30) 2203 2204 err := internal.Retry(ctx, bo, func() (bool, error) { 2205 err := ac.DeleteTable(ctx, name) 2206 if err != nil { 2207 return true, err 2208 } 2209 return false, nil 2210 }) 2211 if err != nil { 2212 t.Logf("DeleteTable: %v", err) 2213 } 2214} 2215