1/* 2Copyright 2019 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "context" 21 "flag" 22 "fmt" 23 "log" 24 "math" 25 "math/rand" 26 "os" 27 "os/exec" 28 "reflect" 29 "sort" 30 "strconv" 31 "strings" 32 "sync" 33 "testing" 34 "time" 35 36 "cloud.google.com/go/iam" 37 "cloud.google.com/go/internal" 38 "cloud.google.com/go/internal/testutil" 39 "cloud.google.com/go/internal/uid" 40 "github.com/golang/protobuf/proto" 41 "github.com/google/go-cmp/cmp" 42 gax "github.com/googleapis/gax-go/v2" 43 "google.golang.org/api/iterator" 44 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 45 v1 "google.golang.org/genproto/googleapis/iam/v1" 46 longrunning "google.golang.org/genproto/googleapis/longrunning" 47 grpc "google.golang.org/grpc" 48 "google.golang.org/grpc/codes" 49 "google.golang.org/grpc/status" 50 "google.golang.org/protobuf/types/known/emptypb" 51) 52 53const ( 54 directPathIPV6Prefix = "[2001:4860:8040" 55 directPathIPV4Prefix = "34.126" 56 timeUntilResourceCleanup = time.Hour * 12 // 12 hours 57 prefixOfInstanceResources = "bt-it-" 58) 59 60var ( 61 presidentsSocialGraph = map[string][]string{ 62 "wmckinley": {"tjefferson"}, 63 "gwashington": {"j§adams"}, 64 "tjefferson": {"gwashington", "j§adams"}, 65 "j§adams": {"gwashington", "tjefferson"}, 66 } 67 68 tableNameSpace = uid.NewSpace("cbt-test", &uid.Options{Short: true}) 69 myTableName = fmt.Sprintf("mytable-%d", time.Now().Unix()) 70 myOtherTableName = fmt.Sprintf("myothertable-%d", time.Now().Unix()) 71) 72 73func populatePresidentsGraph(table *Table) error { 74 ctx := context.Background() 75 for row, ss := range presidentsSocialGraph { 76 mut := NewMutation() 77 for _, name := range ss { 78 mut.Set("follows", name, 1000, []byte("1")) 79 } 80 if err := table.Apply(ctx, row, mut); err != nil { 81 return fmt.Errorf("Mutating row %q: %v", row, err) 82 } 83 } 84 return nil 85} 86 87var instanceToCreate string 88 89func init() { 90 if runCreateInstanceTests { 91 instanceToCreate = fmt.Sprintf("bt-it-%d", time.Now().Unix()) 92 } 93} 94 95func TestMain(m *testing.M) { 96 flag.Parse() 97 98 env, err := NewIntegrationEnv() 99 if err != nil { 100 panic(fmt.Sprintf("there was an issue creating an integration env: %v", err)) 101 } 102 c := env.Config() 103 if c.UseProd { 104 fmt.Printf( 105 "Note: when using prod, you must first create an instance:\n"+ 106 "cbt createinstance %s %s %s %s %s SSD\n", 107 c.Instance, c.Instance, 108 c.Cluster, "us-central1-b", "1", 109 ) 110 } 111 exit := m.Run() 112 if err := cleanup(c); err != nil { 113 log.Printf("Post-test cleanup failed: %v", err) 114 } 115 os.Exit(exit) 116} 117 118func cleanup(c IntegrationTestConfig) error { 119 // Cleanup resources marked with bt-it- after a time delay 120 if !c.UseProd { 121 return nil 122 } 123 ctx := context.Background() 124 iac, err := NewInstanceAdminClient(ctx, c.Project) 125 if err != nil { 126 return err 127 } 128 instances, err := iac.Instances(ctx) 129 if err != nil { 130 return err 131 } 132 133 for _, info := range instances { 134 if strings.HasPrefix(info.Name, prefixOfInstanceResources) { 135 timestamp := info.Name[len(prefixOfInstanceResources):] 136 t, err := strconv.ParseInt(timestamp, 10, 64) 137 if err != nil { 138 return err 139 } 140 uT := time.Unix(t, 0) 141 if time.Now().After(uT.Add(timeUntilResourceCleanup)) { 142 iac.DeleteInstance(ctx, info.Name) 143 } 144 } 145 } 146 147 return nil 148} 149 150func TestIntegration_ConditionalMutations(t *testing.T) { 151 ctx := context.Background() 152 testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 153 if err != nil { 154 t.Fatal(err) 155 } 156 defer cleanup() 157 158 if err := populatePresidentsGraph(table); err != nil { 159 t.Fatal(err) 160 } 161 162 // Do a conditional mutation with a complex filter. 163 mutTrue := NewMutation() 164 mutTrue.Set("follows", "wmckinley", 1000, []byte("1")) 165 filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter(".")) 166 mut := NewCondMutation(filter, mutTrue, nil) 167 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 168 t.Fatalf("Conditionally mutating row: %v", err) 169 } 170 verifyDirectPathRemoteAddress(testEnv, t) 171 // Do a second condition mutation with a filter that does not match, 172 // and thus no changes should be made. 173 mutTrue = NewMutation() 174 mutTrue.DeleteRow() 175 filter = ColumnFilter("snoop.dogg") 176 mut = NewCondMutation(filter, mutTrue, nil) 177 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 178 t.Fatalf("Conditionally mutating row: %v", err) 179 } 180 verifyDirectPathRemoteAddress(testEnv, t) 181 182 // Fetch a row. 183 row, err := table.ReadRow(ctx, "j§adams") 184 if err != nil { 185 t.Fatalf("Reading a row: %v", err) 186 } 187 verifyDirectPathRemoteAddress(testEnv, t) 188 wantRow := Row{ 189 "follows": []ReadItem{ 190 {Row: "j§adams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")}, 191 {Row: "j§adams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")}, 192 }, 193 } 194 if !testutil.Equal(row, wantRow) { 195 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 196 } 197} 198 199func TestIntegration_PartialReadRows(t *testing.T) { 200 ctx := context.Background() 201 _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 202 if err != nil { 203 t.Fatal(err) 204 } 205 defer cleanup() 206 207 if err := populatePresidentsGraph(table); err != nil { 208 t.Fatal(err) 209 } 210 211 // Do a scan and stop part way through. 212 // Verify that the ReadRows callback doesn't keep running. 213 stopped := false 214 err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { 215 if r.Key() < "h" { 216 return true 217 } 218 if !stopped { 219 stopped = true 220 return false 221 } 222 t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key()) 223 return false 224 }) 225 if err != nil { 226 t.Fatalf("Partial ReadRows: %v", err) 227 } 228} 229 230func TestIntegration_ReadRowList(t *testing.T) { 231 ctx := context.Background() 232 _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 233 if err != nil { 234 t.Fatal(err) 235 } 236 defer cleanup() 237 238 if err := populatePresidentsGraph(table); err != nil { 239 t.Fatal(err) 240 } 241 242 // Read a RowList 243 var elt []string 244 keys := RowList{"wmckinley", "gwashington", "j§adams"} 245 want := "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,wmckinley-tjefferson-1" 246 err = table.ReadRows(ctx, keys, func(r Row) bool { 247 for _, ris := range r { 248 for _, ri := range ris { 249 elt = append(elt, formatReadItem(ri)) 250 } 251 } 252 return true 253 }) 254 if err != nil { 255 t.Fatalf("read RowList: %v", err) 256 } 257 258 if got := strings.Join(elt, ","); got != want { 259 t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) 260 } 261} 262 263func TestIntegration_DeleteRow(t *testing.T) { 264 ctx := context.Background() 265 _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 266 if err != nil { 267 t.Fatal(err) 268 } 269 defer cleanup() 270 271 if err := populatePresidentsGraph(table); err != nil { 272 t.Fatal(err) 273 } 274 275 // Delete a row and check it goes away. 276 mut := NewMutation() 277 mut.DeleteRow() 278 if err := table.Apply(ctx, "wmckinley", mut); err != nil { 279 t.Fatalf("Apply DeleteRow: %v", err) 280 } 281 row, err := table.ReadRow(ctx, "wmckinley") 282 if err != nil { 283 t.Fatalf("Reading a row after DeleteRow: %v", err) 284 } 285 if len(row) != 0 { 286 t.Fatalf("Read non-zero row after DeleteRow: %v", row) 287 } 288} 289 290func TestIntegration_ReadModifyWrite(t *testing.T) { 291 ctx := context.Background() 292 testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 293 if err != nil { 294 t.Fatal(err) 295 } 296 defer cleanup() 297 298 if err := populatePresidentsGraph(table); err != nil { 299 t.Fatal(err) 300 } 301 302 if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil { 303 t.Fatalf("Creating column family: %v", err) 304 } 305 306 appendRMW := func(b []byte) *ReadModifyWrite { 307 rmw := NewReadModifyWrite() 308 rmw.AppendValue("counter", "likes", b) 309 return rmw 310 } 311 incRMW := func(n int64) *ReadModifyWrite { 312 rmw := NewReadModifyWrite() 313 rmw.Increment("counter", "likes", n) 314 return rmw 315 } 316 rmwSeq := []struct { 317 desc string 318 rmw *ReadModifyWrite 319 want []byte 320 }{ 321 { 322 desc: "append #1", 323 rmw: appendRMW([]byte{0, 0, 0}), 324 want: []byte{0, 0, 0}, 325 }, 326 { 327 desc: "append #2", 328 rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17 329 want: []byte{0, 0, 0, 0, 0, 0, 0, 17}, 330 }, 331 { 332 desc: "increment", 333 rmw: incRMW(8), 334 want: []byte{0, 0, 0, 0, 0, 0, 0, 25}, 335 }, 336 } 337 for _, step := range rmwSeq { 338 row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw) 339 if err != nil { 340 t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) 341 } 342 verifyDirectPathRemoteAddress(testEnv, t) 343 // Make sure the modified cell returned by the RMW operation has a timestamp. 344 if row["counter"][0].Timestamp == 0 { 345 t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp) 346 } 347 clearTimestamps(row) 348 wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}} 349 if !testutil.Equal(row, wantRow) { 350 t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow) 351 } 352 } 353 354 // Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator. 355 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0})) 356 if err != nil { 357 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 358 } 359 verifyDirectPathRemoteAddress(testEnv, t) 360 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0})) 361 if err != nil { 362 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 363 } 364 verifyDirectPathRemoteAddress(testEnv, t) 365 // Get only the correct row back on read. 366 r, err := table.ReadRow(ctx, "issue-723-1") 367 if err != nil { 368 t.Fatalf("Reading row: %v", err) 369 } 370 verifyDirectPathRemoteAddress(testEnv, t) 371 if r.Key() != "issue-723-1" { 372 t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1") 373 } 374} 375 376func TestIntegration_ArbitraryTimestamps(t *testing.T) { 377 ctx := context.Background() 378 _, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 379 if err != nil { 380 t.Fatal(err) 381 } 382 defer cleanup() 383 384 // Test arbitrary timestamps more thoroughly. 385 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 386 t.Fatalf("Creating column family: %v", err) 387 } 388 const numVersions = 4 389 mut := NewMutation() 390 for i := 1; i < numVersions; i++ { 391 // Timestamps are used in thousands because the server 392 // only permits that granularity. 393 mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 394 mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 395 } 396 if err := table.Apply(ctx, "testrow", mut); err != nil { 397 t.Fatalf("Mutating row: %v", err) 398 } 399 r, err := table.ReadRow(ctx, "testrow") 400 if err != nil { 401 t.Fatalf("Reading row: %v", err) 402 } 403 wantRow := Row{"ts": []ReadItem{ 404 // These should be returned in descending timestamp order. 405 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 406 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 407 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 408 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 409 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 410 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 411 }} 412 if !testutil.Equal(r, wantRow) { 413 t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow) 414 } 415 416 // Do the same read, but filter to the latest two versions. 417 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 418 if err != nil { 419 t.Fatalf("Reading row: %v", err) 420 } 421 wantRow = Row{"ts": []ReadItem{ 422 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 423 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 424 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 425 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 426 }} 427 if !testutil.Equal(r, wantRow) { 428 t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow) 429 } 430 // Check cell offset / limit 431 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3))) 432 if err != nil { 433 t.Fatalf("Reading row: %v", err) 434 } 435 wantRow = Row{"ts": []ReadItem{ 436 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 437 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 438 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 439 }} 440 if !testutil.Equal(r, wantRow) { 441 t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow) 442 } 443 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3))) 444 if err != nil { 445 t.Fatalf("Reading row: %v", err) 446 } 447 wantRow = Row{"ts": []ReadItem{ 448 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 449 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 450 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 451 }} 452 if !testutil.Equal(r, wantRow) { 453 t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow) 454 } 455 // Check timestamp range filtering (with truncation) 456 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000))) 457 if err != nil { 458 t.Fatalf("Reading row: %v", err) 459 } 460 wantRow = Row{"ts": []ReadItem{ 461 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 462 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 463 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 464 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 465 }} 466 if !testutil.Equal(r, wantRow) { 467 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow) 468 } 469 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0))) 470 if err != nil { 471 t.Fatalf("Reading row: %v", err) 472 } 473 wantRow = Row{"ts": []ReadItem{ 474 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 475 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 476 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 477 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 478 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 479 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 480 }} 481 if !testutil.Equal(r, wantRow) { 482 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow) 483 } 484 // Delete non-existing cells, no such column family in this row 485 // Should not delete anything 486 if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil { 487 t.Fatalf("Creating column family: %v", err) 488 } 489 mut = NewMutation() 490 mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval 491 if err := table.Apply(ctx, "testrow", mut); err != nil { 492 t.Fatalf("Mutating row: %v", err) 493 } 494 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 495 if err != nil { 496 t.Fatalf("Reading row: %v", err) 497 } 498 if !testutil.Equal(r, wantRow) { 499 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 500 } 501 // Delete non-existing cells, no such column in this column family 502 // Should not delete anything 503 mut = NewMutation() 504 mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval 505 if err := table.Apply(ctx, "testrow", mut); err != nil { 506 t.Fatalf("Mutating row: %v", err) 507 } 508 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 509 if err != nil { 510 t.Fatalf("Reading row: %v", err) 511 } 512 if !testutil.Equal(r, wantRow) { 513 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 514 } 515 // Delete the cell with timestamp 2000 and repeat the last read, 516 // checking that we get ts 3000 and ts 1000. 517 mut = NewMutation() 518 mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval 519 if err := table.Apply(ctx, "testrow", mut); err != nil { 520 t.Fatalf("Mutating row: %v", err) 521 } 522 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 523 if err != nil { 524 t.Fatalf("Reading row: %v", err) 525 } 526 wantRow = Row{"ts": []ReadItem{ 527 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 528 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 529 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 530 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 531 }} 532 if !testutil.Equal(r, wantRow) { 533 t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow) 534 } 535 536 // Check DeleteCellsInFamily 537 if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil { 538 t.Fatalf("Creating column family: %v", err) 539 } 540 541 mut = NewMutation() 542 mut.Set("status", "start", 2000, []byte("2")) 543 mut.Set("status", "end", 3000, []byte("3")) 544 mut.Set("ts", "col", 1000, []byte("1")) 545 if err := table.Apply(ctx, "row1", mut); err != nil { 546 t.Fatalf("Mutating row: %v", err) 547 } 548 if err := table.Apply(ctx, "row2", mut); err != nil { 549 t.Fatalf("Mutating row: %v", err) 550 } 551 552 mut = NewMutation() 553 mut.DeleteCellsInFamily("status") 554 if err := table.Apply(ctx, "row1", mut); err != nil { 555 t.Fatalf("Delete cf: %v", err) 556 } 557 558 // ColumnFamily removed 559 r, err = table.ReadRow(ctx, "row1") 560 if err != nil { 561 t.Fatalf("Reading row: %v", err) 562 } 563 wantRow = Row{"ts": []ReadItem{ 564 {Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 565 }} 566 if !testutil.Equal(r, wantRow) { 567 t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow) 568 } 569 570 // ColumnFamily not removed 571 r, err = table.ReadRow(ctx, "row2") 572 if err != nil { 573 t.Fatalf("Reading row: %v", err) 574 } 575 wantRow = Row{ 576 "ts": []ReadItem{ 577 {Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 578 }, 579 "status": []ReadItem{ 580 {Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")}, 581 {Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 582 }, 583 } 584 if !testutil.Equal(r, wantRow) { 585 t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow) 586 } 587 588 // Check DeleteCellsInColumn 589 mut = NewMutation() 590 mut.Set("status", "start", 2000, []byte("2")) 591 mut.Set("status", "middle", 3000, []byte("3")) 592 mut.Set("status", "end", 1000, []byte("1")) 593 if err := table.Apply(ctx, "row3", mut); err != nil { 594 t.Fatalf("Mutating row: %v", err) 595 } 596 mut = NewMutation() 597 mut.DeleteCellsInColumn("status", "middle") 598 if err := table.Apply(ctx, "row3", mut); err != nil { 599 t.Fatalf("Delete column: %v", err) 600 } 601 r, err = table.ReadRow(ctx, "row3") 602 if err != nil { 603 t.Fatalf("Reading row: %v", err) 604 } 605 wantRow = Row{ 606 "status": []ReadItem{ 607 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 608 {Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 609 }, 610 } 611 if !testutil.Equal(r, wantRow) { 612 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 613 } 614 mut = NewMutation() 615 mut.DeleteCellsInColumn("status", "start") 616 if err := table.Apply(ctx, "row3", mut); err != nil { 617 t.Fatalf("Delete column: %v", err) 618 } 619 r, err = table.ReadRow(ctx, "row3") 620 if err != nil { 621 t.Fatalf("Reading row: %v", err) 622 } 623 wantRow = Row{ 624 "status": []ReadItem{ 625 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 626 }, 627 } 628 if !testutil.Equal(r, wantRow) { 629 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 630 } 631 mut = NewMutation() 632 mut.DeleteCellsInColumn("status", "end") 633 if err := table.Apply(ctx, "row3", mut); err != nil { 634 t.Fatalf("Delete column: %v", err) 635 } 636 r, err = table.ReadRow(ctx, "row3") 637 if err != nil { 638 t.Fatalf("Reading row: %v", err) 639 } 640 if len(r) != 0 { 641 t.Fatalf("Delete column: got %v, want empty row", r) 642 } 643 // Add same cell after delete 644 mut = NewMutation() 645 mut.Set("status", "end", 1000, []byte("1")) 646 if err := table.Apply(ctx, "row3", mut); err != nil { 647 t.Fatalf("Mutating row: %v", err) 648 } 649 r, err = table.ReadRow(ctx, "row3") 650 if err != nil { 651 t.Fatalf("Reading row: %v", err) 652 } 653 if !testutil.Equal(r, wantRow) { 654 t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow) 655 } 656} 657 658func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { 659 ctx := context.Background() 660 _, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 661 if err != nil { 662 t.Fatal(err) 663 } 664 defer cleanup() 665 666 if err := populatePresidentsGraph(table); err != nil { 667 t.Fatal(err) 668 } 669 670 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 671 t.Fatalf("Creating column family: %v", err) 672 } 673 674 // Do highly concurrent reads/writes. 675 const maxConcurrency = 1000 676 var wg sync.WaitGroup 677 for i := 0; i < maxConcurrency; i++ { 678 wg.Add(1) 679 go func() { 680 defer wg.Done() 681 switch r := rand.Intn(100); { // r ∈ [0,100) 682 case 0 <= r && r < 30: 683 // Do a read. 684 _, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1))) 685 if err != nil { 686 t.Errorf("Concurrent read: %v", err) 687 } 688 case 30 <= r && r < 100: 689 // Do a write. 690 mut := NewMutation() 691 mut.Set("ts", "col", 1000, []byte("data")) 692 if err := table.Apply(ctx, "testrow", mut); err != nil { 693 t.Errorf("Concurrent write: %v", err) 694 } 695 } 696 }() 697 } 698 wg.Wait() 699} 700 701func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { 702 ctx := context.Background() 703 testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 704 if err != nil { 705 t.Fatal(err) 706 } 707 defer cleanup() 708 709 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 710 t.Fatalf("Creating column family: %v", err) 711 } 712 713 bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set. 714 nonsense := []byte("lorem ipsum dolor sit amet, ") 715 fill(bigBytes, nonsense) 716 mut := NewMutation() 717 mut.Set("ts", "col", 1000, bigBytes) 718 if err := table.Apply(ctx, "bigrow", mut); err != nil { 719 t.Fatalf("Big write: %v", err) 720 } 721 verifyDirectPathRemoteAddress(testEnv, t) 722 r, err := table.ReadRow(ctx, "bigrow") 723 if err != nil { 724 t.Fatalf("Big read: %v", err) 725 } 726 verifyDirectPathRemoteAddress(testEnv, t) 727 wantRow := Row{"ts": []ReadItem{ 728 {Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes}, 729 }} 730 if !testutil.Equal(r, wantRow) { 731 t.Fatalf("Big read returned incorrect bytes: %v", r) 732 } 733 734 var wg sync.WaitGroup 735 // Now write 1000 rows, each with 82 KB values, then scan them all. 736 medBytes := make([]byte, 82<<10) 737 fill(medBytes, nonsense) 738 sem := make(chan int, 50) // do up to 50 mutations at a time. 739 for i := 0; i < 1000; i++ { 740 mut := NewMutation() 741 mut.Set("ts", "big-scan", 1000, medBytes) 742 row := fmt.Sprintf("row-%d", i) 743 wg.Add(1) 744 go func() { 745 defer wg.Done() 746 defer func() { <-sem }() 747 sem <- 1 748 if err := table.Apply(ctx, row, mut); err != nil { 749 t.Errorf("Preparing large scan: %v", err) 750 } 751 verifyDirectPathRemoteAddress(testEnv, t) 752 }() 753 } 754 wg.Wait() 755 n := 0 756 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 757 for _, ris := range r { 758 for _, ri := range ris { 759 n += len(ri.Value) 760 } 761 } 762 return true 763 }, RowFilter(ColumnFilter("big-scan"))) 764 if err != nil { 765 t.Fatalf("Doing large scan: %v", err) 766 } 767 verifyDirectPathRemoteAddress(testEnv, t) 768 if want := 1000 * len(medBytes); n != want { 769 t.Fatalf("Large scan returned %d bytes, want %d", n, want) 770 } 771 // Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption. 772 rc := 0 773 wantRc := 3 774 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 775 rc++ 776 return true 777 }, LimitRows(int64(wantRc))) 778 if err != nil { 779 t.Fatal(err) 780 } 781 verifyDirectPathRemoteAddress(testEnv, t) 782 if rc != wantRc { 783 t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) 784 } 785 786 // Test bulk mutations 787 if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil { 788 t.Fatalf("Creating column family: %v", err) 789 } 790 bulkData := map[string][]string{ 791 "red sox": {"2004", "2007", "2013"}, 792 "patriots": {"2001", "2003", "2004", "2014"}, 793 "celtics": {"1981", "1984", "1986", "2008"}, 794 } 795 var rowKeys []string 796 var muts []*Mutation 797 for row, ss := range bulkData { 798 mut := NewMutation() 799 for _, name := range ss { 800 mut.Set("bulk", name, 1000, []byte("1")) 801 } 802 rowKeys = append(rowKeys, row) 803 muts = append(muts, mut) 804 } 805 status, err := table.ApplyBulk(ctx, rowKeys, muts) 806 if err != nil { 807 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 808 } 809 verifyDirectPathRemoteAddress(testEnv, t) 810 if status != nil { 811 t.Fatalf("non-nil errors: %v", err) 812 } 813 814 // Read each row back 815 for rowKey, ss := range bulkData { 816 row, err := table.ReadRow(ctx, rowKey) 817 if err != nil { 818 t.Fatalf("Reading a bulk row: %v", err) 819 } 820 verifyDirectPathRemoteAddress(testEnv, t) 821 var wantItems []ReadItem 822 for _, val := range ss { 823 wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")}) 824 } 825 wantRow := Row{"bulk": wantItems} 826 if !testutil.Equal(row, wantRow) { 827 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 828 } 829 } 830 831 // Test bulk write errors. 832 // Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error. 833 badMut := NewMutation() 834 badMut.Set("badfamily", "col", ServerTime, nil) 835 badMut2 := NewMutation() 836 badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1")) 837 status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2}) 838 if err != nil { 839 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 840 } 841 verifyDirectPathRemoteAddress(testEnv, t) 842 if status == nil { 843 t.Fatalf("No errors for bad bulk mutation") 844 } else if status[0] == nil || status[1] == nil { 845 t.Fatalf("No error for bad bulk mutation") 846 } 847} 848 849func TestIntegration_Read(t *testing.T) { 850 ctx := context.Background() 851 testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 852 if err != nil { 853 t.Fatal(err) 854 } 855 defer cleanup() 856 857 // Insert some data. 858 initialData := map[string][]string{ 859 "wmckinley": {"tjefferson"}, 860 "gwashington": {"j§adams"}, 861 "tjefferson": {"gwashington", "j§adams", "wmckinley"}, 862 "j§adams": {"gwashington", "tjefferson"}, 863 } 864 for row, ss := range initialData { 865 mut := NewMutation() 866 for _, name := range ss { 867 mut.Set("follows", name, 1000, []byte("1")) 868 } 869 if err := table.Apply(ctx, row, mut); err != nil { 870 t.Fatalf("Mutating row %q: %v", row, err) 871 } 872 verifyDirectPathRemoteAddress(testEnv, t) 873 } 874 875 for _, test := range []struct { 876 desc string 877 rr RowSet 878 filter Filter // may be nil 879 limit ReadOption // may be nil 880 881 // We do the read, grab all the cells, turn them into "<row>-<col>-<val>", 882 // and join with a comma. 883 want string 884 wantLabels []string 885 }{ 886 { 887 desc: "read all, unfiltered", 888 rr: RowRange{}, 889 want: "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 890 }, 891 { 892 desc: "read with InfiniteRange, unfiltered", 893 rr: InfiniteRange("tjefferson"), 894 want: "tjefferson-gwashington-1,tjefferson-j§adams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 895 }, 896 { 897 desc: "read with NewRange, unfiltered", 898 rr: NewRange("gargamel", "hubbard"), 899 want: "gwashington-j§adams-1", 900 }, 901 { 902 desc: "read with PrefixRange, unfiltered", 903 rr: PrefixRange("j§ad"), 904 want: "j§adams-gwashington-1,j§adams-tjefferson-1", 905 }, 906 { 907 desc: "read with SingleRow, unfiltered", 908 rr: SingleRow("wmckinley"), 909 want: "wmckinley-tjefferson-1", 910 }, 911 { 912 desc: "read all, with ColumnFilter", 913 rr: RowRange{}, 914 filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson" 915 want: "gwashington-j§adams-1,j§adams-tjefferson-1,tjefferson-j§adams-1,wmckinley-tjefferson-1", 916 }, 917 { 918 desc: "read all, with ColumnFilter, prefix", 919 rr: RowRange{}, 920 filter: ColumnFilter("j"), // no matches 921 want: "", 922 }, 923 { 924 desc: "read range, with ColumnRangeFilter", 925 rr: RowRange{}, 926 filter: ColumnRangeFilter("follows", "h", "k"), 927 want: "gwashington-j§adams-1,tjefferson-j§adams-1", 928 }, 929 { 930 desc: "read range from empty, with ColumnRangeFilter", 931 rr: RowRange{}, 932 filter: ColumnRangeFilter("follows", "", "u"), 933 want: "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,wmckinley-tjefferson-1", 934 }, 935 { 936 desc: "read range from start to empty, with ColumnRangeFilter", 937 rr: RowRange{}, 938 filter: ColumnRangeFilter("follows", "h", ""), 939 want: "gwashington-j§adams-1,j§adams-tjefferson-1,tjefferson-j§adams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 940 }, 941 { 942 desc: "read with RowKeyFilter", 943 rr: RowRange{}, 944 filter: RowKeyFilter(".*wash.*"), 945 want: "gwashington-j§adams-1", 946 }, 947 { 948 desc: "read with RowKeyFilter unicode", 949 rr: RowRange{}, 950 filter: RowKeyFilter(".*j§.*"), 951 want: "j§adams-gwashington-1,j§adams-tjefferson-1", 952 }, 953 { 954 desc: "read with RowKeyFilter escaped", 955 rr: RowRange{}, 956 filter: RowKeyFilter(`.*j\xC2\xA7.*`), 957 want: "j§adams-gwashington-1,j§adams-tjefferson-1", 958 }, 959 { 960 desc: "read with RowKeyFilter, prefix", 961 rr: RowRange{}, 962 filter: RowKeyFilter("gwash"), 963 want: "", 964 }, 965 { 966 desc: "read with RowKeyFilter, no matches", 967 rr: RowRange{}, 968 filter: RowKeyFilter(".*xxx.*"), 969 want: "", 970 }, 971 { 972 desc: "read with FamilyFilter, no matches", 973 rr: RowRange{}, 974 filter: FamilyFilter(".*xxx.*"), 975 want: "", 976 }, 977 { 978 desc: "read with ColumnFilter + row limit", 979 rr: RowRange{}, 980 filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson" 981 limit: LimitRows(2), 982 want: "gwashington-j§adams-1,j§adams-tjefferson-1", 983 }, 984 { 985 desc: "apply labels to the result rows", 986 rr: RowRange{}, 987 filter: LabelFilter("test-label"), 988 limit: LimitRows(2), 989 want: "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1", 990 wantLabels: []string{"test-label", "test-label", "test-label"}, 991 }, 992 { 993 desc: "read all, strip values", 994 rr: RowRange{}, 995 filter: StripValueFilter(), 996 want: "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 997 }, 998 { 999 desc: "read with ColumnFilter + row limit + strip values", 1000 rr: RowRange{}, 1001 filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "j§adams" and "tjefferson" 1002 limit: LimitRows(2), 1003 want: "gwashington-j§adams-,j§adams-tjefferson-", 1004 }, 1005 { 1006 desc: "read with condition, strip values on true", 1007 rr: RowRange{}, 1008 filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil), 1009 want: "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 1010 }, 1011 { 1012 desc: "read with condition, strip values on false", 1013 rr: RowRange{}, 1014 filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()), 1015 want: "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 1016 }, 1017 { 1018 desc: "read with ValueRangeFilter + row limit", 1019 rr: RowRange{}, 1020 filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1" 1021 limit: LimitRows(2), 1022 want: "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1", 1023 }, 1024 { 1025 desc: "read with ValueRangeFilter, no match on exclusive end", 1026 rr: RowRange{}, 1027 filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match 1028 want: "", 1029 }, 1030 { 1031 desc: "read with ValueRangeFilter, no matches", 1032 rr: RowRange{}, 1033 filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing 1034 want: "", 1035 }, 1036 { 1037 desc: "read with InterleaveFilter, no matches on all filters", 1038 rr: RowRange{}, 1039 filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")), 1040 want: "", 1041 }, 1042 { 1043 desc: "read with InterleaveFilter, no duplicate cells", 1044 rr: RowRange{}, 1045 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")), 1046 want: "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,wmckinley-tjefferson-1", 1047 }, 1048 { 1049 desc: "read with InterleaveFilter, with duplicate cells", 1050 rr: RowRange{}, 1051 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")), 1052 want: "j§adams-gwashington-1,j§adams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1", 1053 }, 1054 { 1055 desc: "read with a RowRangeList and no filter", 1056 rr: RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")}, 1057 want: "gwashington-j§adams-1,wmckinley-tjefferson-1", 1058 }, 1059 { 1060 desc: "chain that excludes rows and matches nothing, in a condition", 1061 rr: RowRange{}, 1062 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil), 1063 want: "", 1064 }, 1065 { 1066 desc: "chain that ends with an interleave that has no match. covers #804", 1067 rr: RowRange{}, 1068 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil), 1069 want: "", 1070 }, 1071 } { 1072 t.Run(test.desc, func(t *testing.T) { 1073 var opts []ReadOption 1074 if test.filter != nil { 1075 opts = append(opts, RowFilter(test.filter)) 1076 } 1077 if test.limit != nil { 1078 opts = append(opts, test.limit) 1079 } 1080 var elt, labels []string 1081 err := table.ReadRows(ctx, test.rr, func(r Row) bool { 1082 for _, ris := range r { 1083 for _, ri := range ris { 1084 labels = append(labels, ri.Labels...) 1085 elt = append(elt, formatReadItem(ri)) 1086 } 1087 } 1088 return true 1089 }, opts...) 1090 if err != nil { 1091 t.Fatal(err) 1092 } 1093 verifyDirectPathRemoteAddress(testEnv, t) 1094 if got := strings.Join(elt, ","); got != test.want { 1095 t.Fatalf("got %q\nwant %q", got, test.want) 1096 } 1097 if got, want := labels, test.wantLabels; !reflect.DeepEqual(got, want) { 1098 t.Fatalf("got %q\nwant %q", got, want) 1099 } 1100 }) 1101 } 1102} 1103 1104func TestIntegration_SampleRowKeys(t *testing.T) { 1105 ctx := context.Background() 1106 testEnv, client, adminClient, _, _, cleanup, err := setupIntegration(ctx, t) 1107 if err != nil { 1108 t.Fatal(err) 1109 } 1110 defer cleanup() 1111 1112 presplitTable := fmt.Sprintf("presplit-table-%d", time.Now().Unix()) 1113 if err := adminClient.CreatePresplitTable(ctx, presplitTable, []string{"follows"}); err != nil { 1114 t.Fatal(err) 1115 } 1116 defer adminClient.DeleteTable(ctx, presplitTable) 1117 1118 if err := adminClient.CreateColumnFamily(ctx, presplitTable, "follows"); err != nil { 1119 t.Fatal(err) 1120 } 1121 1122 table := client.Open(presplitTable) 1123 1124 // Insert some data. 1125 initialData := map[string][]string{ 1126 "wmckinley11": {"tjefferson11"}, 1127 "gwashington77": {"j§adams77"}, 1128 "tjefferson0": {"gwashington0", "j§adams0"}, 1129 } 1130 1131 for row, ss := range initialData { 1132 mut := NewMutation() 1133 for _, name := range ss { 1134 mut.Set("follows", name, 1000, []byte("1")) 1135 } 1136 if err := table.Apply(ctx, row, mut); err != nil { 1137 t.Fatalf("Mutating row %q: %v", row, err) 1138 } 1139 verifyDirectPathRemoteAddress(testEnv, t) 1140 } 1141 sampleKeys, err := table.SampleRowKeys(context.Background()) 1142 if err != nil { 1143 t.Fatalf("%s: %v", "SampleRowKeys:", err) 1144 } 1145 if len(sampleKeys) == 0 { 1146 t.Error("SampleRowKeys length 0") 1147 } 1148} 1149 1150func TestIntegration_Admin(t *testing.T) { 1151 testEnv, err := NewIntegrationEnv() 1152 if err != nil { 1153 t.Fatalf("IntegrationEnv: %v", err) 1154 } 1155 defer testEnv.Close() 1156 1157 timeout := 2 * time.Second 1158 if testEnv.Config().UseProd { 1159 timeout = 5 * time.Minute 1160 } 1161 ctx, _ := context.WithTimeout(context.Background(), timeout) 1162 1163 adminClient, err := testEnv.NewAdminClient() 1164 if err != nil { 1165 t.Fatalf("NewAdminClient: %v", err) 1166 } 1167 defer adminClient.Close() 1168 1169 iAdminClient, err := testEnv.NewInstanceAdminClient() 1170 if err != nil { 1171 t.Fatalf("NewInstanceAdminClient: %v", err) 1172 } 1173 if iAdminClient != nil { 1174 defer iAdminClient.Close() 1175 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1176 if err != nil { 1177 t.Errorf("InstanceInfo: %v", err) 1178 } 1179 if iInfo.Name != adminClient.instance { 1180 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1181 } 1182 } 1183 1184 list := func() []string { 1185 tbls, err := adminClient.Tables(ctx) 1186 if err != nil { 1187 t.Fatalf("Fetching list of tables: %v", err) 1188 } 1189 sort.Strings(tbls) 1190 return tbls 1191 } 1192 containsAll := func(got, want []string) bool { 1193 gotSet := make(map[string]bool) 1194 1195 for _, s := range got { 1196 gotSet[s] = true 1197 } 1198 for _, s := range want { 1199 if !gotSet[s] { 1200 return false 1201 } 1202 } 1203 return true 1204 } 1205 1206 defer deleteTable(ctx, t, adminClient, myTableName) 1207 1208 if err := adminClient.CreateTable(ctx, myTableName); err != nil { 1209 t.Fatalf("Creating table: %v", err) 1210 } 1211 1212 defer deleteTable(ctx, t, adminClient, myOtherTableName) 1213 1214 if err := adminClient.CreateTable(ctx, myOtherTableName); err != nil { 1215 t.Fatalf("Creating table: %v", err) 1216 } 1217 1218 if got, want := list(), []string{myOtherTableName, myTableName}; !containsAll(got, want) { 1219 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1220 } 1221 1222 must(adminClient.WaitForReplication(ctx, myTableName)) 1223 1224 if err := adminClient.DeleteTable(ctx, myOtherTableName); err != nil { 1225 t.Fatalf("Deleting table: %v", err) 1226 } 1227 tables := list() 1228 if got, want := tables, []string{myTableName}; !containsAll(got, want) { 1229 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1230 } 1231 if got, unwanted := tables, []string{myOtherTableName}; containsAll(got, unwanted) { 1232 t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted) 1233 } 1234 1235 tblConf := TableConf{ 1236 TableID: "conftable", 1237 Families: map[string]GCPolicy{ 1238 "fam1": MaxVersionsPolicy(1), 1239 "fam2": MaxVersionsPolicy(2), 1240 }, 1241 } 1242 if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil { 1243 t.Fatalf("Creating table from TableConf: %v", err) 1244 } 1245 defer deleteTable(ctx, t, adminClient, tblConf.TableID) 1246 1247 tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID) 1248 if err != nil { 1249 t.Fatalf("Getting table info: %v", err) 1250 } 1251 sort.Strings(tblInfo.Families) 1252 wantFams := []string{"fam1", "fam2"} 1253 if !testutil.Equal(tblInfo.Families, wantFams) { 1254 t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams) 1255 } 1256 1257 // Populate mytable and drop row ranges 1258 if err = adminClient.CreateColumnFamily(ctx, myTableName, "cf"); err != nil { 1259 t.Fatalf("Creating column family: %v", err) 1260 } 1261 1262 client, err := testEnv.NewClient() 1263 if err != nil { 1264 t.Fatalf("NewClient: %v", err) 1265 } 1266 defer client.Close() 1267 1268 tbl := client.Open(myTableName) 1269 1270 prefixes := []string{"a", "b", "c"} 1271 for _, prefix := range prefixes { 1272 for i := 0; i < 5; i++ { 1273 mut := NewMutation() 1274 mut.Set("cf", "col", 1000, []byte("1")) 1275 if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil { 1276 t.Fatalf("Mutating row: %v", err) 1277 } 1278 } 1279 } 1280 1281 if err = adminClient.DropRowRange(ctx, myTableName, "a"); err != nil { 1282 t.Errorf("DropRowRange a: %v", err) 1283 } 1284 if err = adminClient.DropRowRange(ctx, myTableName, "c"); err != nil { 1285 t.Errorf("DropRowRange c: %v", err) 1286 } 1287 if err = adminClient.DropRowRange(ctx, myTableName, "x"); err != nil { 1288 t.Errorf("DropRowRange x: %v", err) 1289 } 1290 1291 var gotRowCount int 1292 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1293 gotRowCount++ 1294 if !strings.HasPrefix(row.Key(), "b") { 1295 t.Errorf("Invalid row after dropping range: %v", row) 1296 } 1297 return true 1298 })) 1299 if gotRowCount != 5 { 1300 t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5) 1301 } 1302 1303 if err = adminClient.DropAllRows(ctx, myTableName); err != nil { 1304 t.Errorf("DropAllRows mytable: %v", err) 1305 } 1306 1307 gotRowCount = 0 1308 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1309 gotRowCount++ 1310 return true 1311 })) 1312 if gotRowCount != 0 { 1313 t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0) 1314 } 1315 1316 // Validate Encryption Info configured to default. (not supported by emulator) 1317 if testEnv.Config().UseProd { 1318 encryptionInfo, err := adminClient.EncryptionInfo(ctx, myTableName) 1319 if err != nil { 1320 t.Fatalf("EncryptionInfo: %v", err) 1321 } 1322 if got, want := len(encryptionInfo), 1; !cmp.Equal(got, want) { 1323 t.Fatalf("Number of Clusters with Encryption Info: %v, want: %v", got, want) 1324 } 1325 1326 clusterEncryptionInfo := encryptionInfo[testEnv.Config().Cluster][0] 1327 if clusterEncryptionInfo.KMSKeyVersion != "" { 1328 t.Errorf("Encryption Info mismatch, got %v, want %v", clusterEncryptionInfo.KMSKeyVersion, 0) 1329 } 1330 if clusterEncryptionInfo.Type != GoogleDefaultEncryption { 1331 t.Errorf("Encryption Info mismatch, got %v, want %v", clusterEncryptionInfo.Type, GoogleDefaultEncryption) 1332 } 1333 } 1334 1335} 1336 1337func TestIntegration_TableIam(t *testing.T) { 1338 testEnv, err := NewIntegrationEnv() 1339 if err != nil { 1340 t.Fatalf("IntegrationEnv: %v", err) 1341 } 1342 defer testEnv.Close() 1343 1344 if !testEnv.Config().UseProd { 1345 t.Skip("emulator doesn't support IAM Policy creation") 1346 } 1347 1348 timeout := 5 * time.Minute 1349 ctx, _ := context.WithTimeout(context.Background(), timeout) 1350 1351 adminClient, err := testEnv.NewAdminClient() 1352 if err != nil { 1353 t.Fatalf("NewAdminClient: %v", err) 1354 } 1355 defer adminClient.Close() 1356 1357 defer deleteTable(ctx, t, adminClient, myTableName) 1358 if err := adminClient.CreateTable(ctx, myTableName); err != nil { 1359 t.Fatalf("Creating table: %v", err) 1360 } 1361 1362 // Verify that the IAM Controls work for Tables. 1363 iamHandle := adminClient.TableIAM(myTableName) 1364 p, err := iamHandle.Policy(ctx) 1365 if err != nil { 1366 t.Fatalf("Iam GetPolicy mytable: %v", err) 1367 } 1368 if err = iamHandle.SetPolicy(ctx, p); err != nil { 1369 t.Errorf("Iam SetPolicy mytable: %v", err) 1370 } 1371 if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil { 1372 t.Errorf("Iam TestPermissions mytable: %v", err) 1373 } 1374} 1375 1376func TestIntegration_BackupIAM(t *testing.T) { 1377 testEnv, err := NewIntegrationEnv() 1378 if err != nil { 1379 t.Fatalf("IntegrationEnv: %v", err) 1380 } 1381 defer testEnv.Close() 1382 1383 if !testEnv.Config().UseProd { 1384 t.Skip("emulator doesn't support IAM Policy creation") 1385 } 1386 timeout := 5 * time.Minute 1387 ctx, _ := context.WithTimeout(context.Background(), timeout) 1388 1389 adminClient, err := testEnv.NewAdminClient() 1390 if err != nil { 1391 t.Fatalf("NewAdminClient: %v", err) 1392 } 1393 defer adminClient.Close() 1394 1395 table := testEnv.Config().Table 1396 cluster := testEnv.Config().Cluster 1397 1398 defer deleteTable(ctx, t, adminClient, table) 1399 if err := adminClient.CreateTable(ctx, table); err != nil { 1400 t.Fatalf("Creating table: %v", err) 1401 } 1402 // Create backup. 1403 backup := "backup" 1404 defer adminClient.DeleteBackup(ctx, cluster, backup) 1405 if err = adminClient.CreateBackup(ctx, table, cluster, backup, time.Now().Add(8*time.Hour)); err != nil { 1406 t.Fatalf("Creating backup: %v", err) 1407 } 1408 iamHandle := adminClient.BackupIAM(cluster, backup) 1409 // Get backup policy. 1410 p, err := iamHandle.Policy(ctx) 1411 if err != nil { 1412 t.Errorf("iamHandle.Policy: %v", err) 1413 } 1414 // The resource is new, so the policy should be empty. 1415 if got := p.Roles(); len(got) > 0 { 1416 t.Errorf("got roles %v, want none", got) 1417 } 1418 // Set backup policy. 1419 member := "domain:google.com" 1420 // Add a member, set the policy, then check that the member is present. 1421 p.Add(member, iam.Viewer) 1422 if err = iamHandle.SetPolicy(ctx, p); err != nil { 1423 t.Errorf("iamHandle.SetPolicy: %v", err) 1424 } 1425 p, err = iamHandle.Policy(ctx) 1426 if err != nil { 1427 t.Errorf("iamHandle.Policy: %v", err) 1428 } 1429 if got, want := p.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) { 1430 t.Errorf("iamHandle.Policy: got %v, want %v", got, want) 1431 } 1432 // Test backup permissions. 1433 permissions := []string{"bigtable.backups.get", "bigtable.backups.update"} 1434 _, err = iamHandle.TestPermissions(ctx, permissions) 1435 if err != nil { 1436 t.Errorf("iamHandle.TestPermissions: %v", err) 1437 } 1438} 1439 1440func TestIntegration_AdminCreateInstance(t *testing.T) { 1441 if instanceToCreate == "" { 1442 t.Skip("instanceToCreate not set, skipping instance creation testing") 1443 } 1444 1445 testEnv, err := NewIntegrationEnv() 1446 if err != nil { 1447 t.Fatalf("IntegrationEnv: %v", err) 1448 } 1449 defer testEnv.Close() 1450 1451 if !testEnv.Config().UseProd { 1452 t.Skip("emulator doesn't support instance creation") 1453 } 1454 1455 timeout := 5 * time.Minute 1456 ctx, _ := context.WithTimeout(context.Background(), timeout) 1457 1458 iAdminClient, err := testEnv.NewInstanceAdminClient() 1459 if err != nil { 1460 t.Fatalf("NewInstanceAdminClient: %v", err) 1461 } 1462 defer iAdminClient.Close() 1463 1464 clusterID := instanceToCreate + "-cluster" 1465 1466 // Create a development instance 1467 conf := &InstanceConf{ 1468 InstanceId: instanceToCreate, 1469 ClusterId: clusterID, 1470 DisplayName: "test instance", 1471 Zone: instanceToCreateZone, 1472 InstanceType: DEVELOPMENT, 1473 Labels: map[string]string{"test-label-key": "test-label-value"}, 1474 } 1475 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1476 t.Fatalf("CreateInstance: %v", err) 1477 } 1478 1479 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1480 1481 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1482 if err != nil { 1483 t.Fatalf("InstanceInfo: %v", err) 1484 } 1485 1486 // Basic return values are tested elsewhere, check instance type 1487 if iInfo.InstanceType != DEVELOPMENT { 1488 t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType) 1489 } 1490 if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) { 1491 t.Fatalf("Labels: %v, want: %v", got, want) 1492 } 1493 1494 // Update everything we can about the instance in one call. 1495 confWithClusters := &InstanceWithClustersConfig{ 1496 InstanceID: instanceToCreate, 1497 DisplayName: "new display name", 1498 InstanceType: PRODUCTION, 1499 Labels: map[string]string{"new-label-key": "new-label-value"}, 1500 Clusters: []ClusterConfig{ 1501 {ClusterID: clusterID, NumNodes: 5}, 1502 }, 1503 } 1504 1505 if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { 1506 t.Fatalf("UpdateInstanceWithClusters: %v", err) 1507 } 1508 1509 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1510 if err != nil { 1511 t.Fatalf("InstanceInfo: %v", err) 1512 } 1513 1514 if iInfo.InstanceType != PRODUCTION { 1515 t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType) 1516 } 1517 if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) { 1518 t.Fatalf("Labels: %v, want: %v", got, want) 1519 } 1520 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1521 t.Fatalf("Display name: %q, want: %q", got, want) 1522 } 1523 1524 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1525 if err != nil { 1526 t.Fatalf("GetCluster: %v", err) 1527 } 1528 1529 if cInfo.ServeNodes != 5 { 1530 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1531 } 1532 1533 if cInfo.KMSKeyName != "" { 1534 t.Fatalf("KMSKeyName: %v, want: %v", cInfo.KMSKeyName, "") 1535 } 1536} 1537 1538func TestIntegration_AdminEncryptionInfo(t *testing.T) { 1539 t.Skip("https://github.com/googleapis/google-cloud-go/issues/4173") 1540 if instanceToCreate == "" { 1541 t.Skip("instanceToCreate not set, skipping instance creation testing") 1542 } 1543 1544 testEnv, err := NewIntegrationEnv() 1545 if err != nil { 1546 t.Fatalf("IntegrationEnv: %v", err) 1547 } 1548 defer testEnv.Close() 1549 1550 if !testEnv.Config().UseProd { 1551 t.Skip("emulator doesn't support instance creation") 1552 } 1553 1554 // adjust test environment to use our cluster to create 1555 c := testEnv.Config() 1556 c.Instance = instanceToCreate 1557 testEnv, err = NewProdEnv(c) 1558 if err != nil { 1559 t.Fatalf("NewProdEnv: %v", err) 1560 } 1561 1562 timeout := 5 * time.Minute 1563 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1564 defer cancel() 1565 1566 iAdminClient, err := testEnv.NewInstanceAdminClient() 1567 if err != nil { 1568 t.Fatalf("NewInstanceAdminClient: %v", err) 1569 } 1570 defer iAdminClient.Close() 1571 1572 adminClient, err := testEnv.NewAdminClient() 1573 if err != nil { 1574 t.Fatalf("NewAdminClient: %v", err) 1575 } 1576 defer adminClient.Close() 1577 1578 table := instanceToCreate + "-table" 1579 clusterID := instanceToCreate + "-cluster" 1580 1581 keyRingName := os.Getenv("GCLOUD_TESTS_BIGTABLE_KEYRING") 1582 if keyRingName == "" { 1583 // try to fall back on GOLANG keyring 1584 keyRingName = os.Getenv("GCLOUD_TESTS_GOLANG_KEYRING") 1585 if keyRingName == "" { 1586 t.Fatal("GCLOUD_TESTS_BIGTABLE_KEYRING or GCLOUD_TESTS_GOLANG_KEYRING must be set. See CONTRIBUTING.md for details") 1587 } 1588 } 1589 kmsKeyName := keyRingName + "/cryptoKeys/key1" 1590 1591 conf := &InstanceWithClustersConfig{ 1592 InstanceID: instanceToCreate, 1593 DisplayName: "test instance", 1594 Clusters: []ClusterConfig{ 1595 { 1596 ClusterID: clusterID, 1597 KMSKeyName: kmsKeyName, 1598 Zone: instanceToCreateZone, 1599 NumNodes: 1, 1600 }, 1601 }, 1602 } 1603 if err := iAdminClient.CreateInstanceWithClusters(ctx, conf); err != nil { 1604 t.Fatalf("CreateInstance: %v", err) 1605 } 1606 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1607 1608 // Delete the table at the end of the test. Schedule ahead of time 1609 // in case the client fails 1610 defer deleteTable(ctx, t, adminClient, table) 1611 if err := adminClient.CreateTable(ctx, table); err != nil { 1612 t.Fatalf("Creating table: %v", err) 1613 } 1614 1615 encryptionKeyVersion := kmsKeyName + "/cryptoKeyVersions/1" 1616 1617 // The encryption info can take 30-300s (currently about 120-190s) to 1618 // become ready. 1619 for i := 0; i < 30; i++ { 1620 encryptionInfo, err := adminClient.EncryptionInfo(ctx, table) 1621 if err != nil { 1622 t.Fatalf("EncryptionInfo: %v", err) 1623 } 1624 1625 kmsKeyVersion := encryptionInfo[clusterID][0].KMSKeyVersion 1626 if kmsKeyVersion != "" { 1627 break 1628 } 1629 1630 time.Sleep(time.Second * 10) 1631 } 1632 1633 // Validate Encryption Info under getTable 1634 table2, err := adminClient.getTable(ctx, table, btapb.Table_ENCRYPTION_VIEW) 1635 if err != nil { 1636 t.Fatalf("Getting Table: %v", err) 1637 } 1638 if got, want := len(table2.ClusterStates), 1; !cmp.Equal(got, want) { 1639 t.Fatalf("Table Cluster States %v, want: %v", got, want) 1640 } 1641 clusterState := table2.ClusterStates[clusterID] 1642 if got, want := len(clusterState.EncryptionInfo), 1; !cmp.Equal(got, want) { 1643 t.Fatalf("Table Encryption Info Length: %v, want: %v", got, want) 1644 } 1645 tableEncInfo := clusterState.EncryptionInfo[0] 1646 if got, want := int(tableEncInfo.EncryptionStatus.Code), 0; !cmp.Equal(got, want) { 1647 t.Fatalf("EncryptionStatus: %v, want: %v", got, want) 1648 } 1649 // NOTE: this EncryptionType is btapb.EncryptionInfo_EncryptionType 1650 if got, want := tableEncInfo.EncryptionType, btapb.EncryptionInfo_CUSTOMER_MANAGED_ENCRYPTION; !cmp.Equal(got, want) { 1651 t.Fatalf("EncryptionType: %v, want: %v", got, want) 1652 } 1653 if got, want := tableEncInfo.KmsKeyVersion, encryptionKeyVersion; !cmp.Equal(got, want) { 1654 t.Fatalf("KMS Key Version: %v, want: %v", got, want) 1655 } 1656 1657 // Validate Encyrption Info retrieved via EncryptionInfo 1658 encryptionInfo, err := adminClient.EncryptionInfo(ctx, table) 1659 if err != nil { 1660 t.Fatalf("EncryptionInfo: %v", err) 1661 } 1662 if got, want := len(encryptionInfo), 1; !cmp.Equal(got, want) { 1663 t.Fatalf("Number of Clusters with Encryption Info: %v, want: %v", got, want) 1664 } 1665 encryptionInfos := encryptionInfo[clusterID] 1666 if got, want := len(encryptionInfos), 1; !cmp.Equal(got, want) { 1667 t.Fatalf("Encryption Info Length: %v, want: %v", got, want) 1668 } 1669 if len(encryptionInfos) != 1 { 1670 t.Fatalf("Expected Single EncryptionInfo") 1671 } 1672 v := encryptionInfos[0] 1673 if got, want := int(v.Status.Code), 0; !cmp.Equal(got, want) { 1674 t.Fatalf("EncryptionStatus: %v, want: %v", got, want) 1675 } 1676 // NOTE: this EncryptionType is EncryptionType 1677 if got, want := v.Type, CustomerManagedEncryption; !cmp.Equal(got, want) { 1678 t.Fatalf("EncryptionType: %v, want: %v", got, want) 1679 } 1680 if got, want := v.KMSKeyVersion, encryptionKeyVersion; !cmp.Equal(got, want) { 1681 t.Fatalf("KMS Key Version: %v, want: %v", got, want) 1682 } 1683 1684 // Validate CMEK on Cluster Info 1685 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1686 if err != nil { 1687 t.Fatalf("GetCluster: %v", err) 1688 } 1689 1690 if got, want := cInfo.KMSKeyName, kmsKeyName; !cmp.Equal(got, want) { 1691 t.Fatalf("KMSKeyName: %v, want: %v", got, want) 1692 } 1693 1694 // Create a backup with CMEK enabled, verify backup encryption info 1695 backupName := "backupCMEK" 1696 defer adminClient.DeleteBackup(ctx, clusterID, backupName) 1697 if err = adminClient.CreateBackup(ctx, table, clusterID, backupName, time.Now().Add(8*time.Hour)); err != nil { 1698 t.Fatalf("Creating backup: %v", err) 1699 } 1700 backup, err := adminClient.BackupInfo(ctx, clusterID, backupName) 1701 if err != nil { 1702 t.Fatalf("BackupInfo: %v", backup) 1703 } 1704 1705 if got, want := backup.EncryptionInfo.Type, CustomerManagedEncryption; !cmp.Equal(got, want) { 1706 t.Fatalf("Backup Encryption EncryptionType: %v, want: %v", got, want) 1707 } 1708 if got, want := backup.EncryptionInfo.KMSKeyVersion, encryptionKeyVersion; !cmp.Equal(got, want) { 1709 t.Fatalf("Backup Encryption KMSKeyVersion: %v, want: %v", got, want) 1710 } 1711 if got, want := int(backup.EncryptionInfo.Status.Code), 2; !cmp.Equal(got, want) { 1712 t.Fatalf("Backup EncryptionStatus: %v, want: %v", got, want) 1713 } 1714} 1715 1716func TestIntegration_AdminUpdateInstanceLabels(t *testing.T) { 1717 // Check the environments 1718 if instanceToCreate == "" { 1719 t.Skip("instanceToCreate not set, skipping instance creation testing") 1720 } 1721 testEnv, err := NewIntegrationEnv() 1722 if err != nil { 1723 t.Fatalf("IntegrationEnv: %v", err) 1724 } 1725 defer testEnv.Close() 1726 if !testEnv.Config().UseProd { 1727 t.Skip("emulator doesn't support instance creation") 1728 } 1729 1730 // Create an instance admin client 1731 timeout := 5 * time.Minute 1732 ctx, _ := context.WithTimeout(context.Background(), timeout) 1733 iAdminClient, err := testEnv.NewInstanceAdminClient() 1734 if err != nil { 1735 t.Fatalf("NewInstanceAdminClient: %v", err) 1736 } 1737 defer iAdminClient.Close() 1738 1739 // Create a test instance 1740 conf := &InstanceConf{ 1741 InstanceId: instanceToCreate, 1742 ClusterId: instanceToCreate + "-cluster", 1743 DisplayName: "test instance", 1744 InstanceType: DEVELOPMENT, 1745 Zone: instanceToCreateZone, 1746 } 1747 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1748 t.Fatalf("CreateInstance: %v", err) 1749 } 1750 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1751 1752 // Check the created test instances 1753 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1754 if err != nil { 1755 t.Fatalf("InstanceInfo: %v", err) 1756 } 1757 if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) { 1758 t.Fatalf("Labels: %v, want: %v", got, want) 1759 } 1760 1761 // Test patterns to update instance labels 1762 tests := []struct { 1763 name string 1764 in map[string]string 1765 out map[string]string 1766 }{ 1767 { 1768 name: "update labels", 1769 in: map[string]string{"test-label-key": "test-label-value"}, 1770 out: map[string]string{"test-label-key": "test-label-value"}, 1771 }, 1772 { 1773 name: "update multiple labels", 1774 in: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"}, 1775 out: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"}, 1776 }, 1777 { 1778 name: "not update existing labels", 1779 in: nil, // nil map 1780 out: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"}, 1781 }, 1782 { 1783 name: "delete labels", 1784 in: map[string]string{}, // empty map 1785 out: nil, 1786 }, 1787 } 1788 for _, tt := range tests { 1789 t.Run(tt.name, func(t *testing.T) { 1790 confWithClusters := &InstanceWithClustersConfig{ 1791 InstanceID: instanceToCreate, 1792 Labels: tt.in, 1793 } 1794 if err := iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { 1795 t.Fatalf("UpdateInstanceWithClusters: %v", err) 1796 } 1797 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1798 if err != nil { 1799 t.Fatalf("InstanceInfo: %v", err) 1800 } 1801 if got, want := iInfo.Labels, tt.out; !cmp.Equal(got, want) { 1802 t.Fatalf("Labels: %v, want: %v", got, want) 1803 } 1804 }) 1805 } 1806} 1807 1808func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) { 1809 if instanceToCreate == "" { 1810 t.Skip("instanceToCreate not set, skipping instance update testing") 1811 } 1812 1813 testEnv, err := NewIntegrationEnv() 1814 if err != nil { 1815 t.Fatalf("IntegrationEnv: %v", err) 1816 } 1817 defer testEnv.Close() 1818 1819 if !testEnv.Config().UseProd { 1820 t.Skip("emulator doesn't support instance creation") 1821 } 1822 1823 timeout := 5 * time.Minute 1824 ctx, _ := context.WithTimeout(context.Background(), timeout) 1825 1826 iAdminClient, err := testEnv.NewInstanceAdminClient() 1827 if err != nil { 1828 t.Fatalf("NewInstanceAdminClient: %v", err) 1829 } 1830 defer iAdminClient.Close() 1831 1832 clusterID := instanceToCreate + "-cluster" 1833 1834 // Create a development instance 1835 conf := &InstanceConf{ 1836 InstanceId: instanceToCreate, 1837 ClusterId: clusterID, 1838 DisplayName: "test instance", 1839 Zone: instanceToCreateZone, 1840 InstanceType: DEVELOPMENT, 1841 Labels: map[string]string{"test-label-key": "test-label-value"}, 1842 } 1843 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1844 t.Fatalf("CreateInstance: %v", err) 1845 } 1846 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1847 1848 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1849 if err != nil { 1850 t.Fatalf("InstanceInfo: %v", err) 1851 } 1852 1853 // Basic return values are tested elsewhere, check instance type 1854 if iInfo.InstanceType != DEVELOPMENT { 1855 t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType) 1856 } 1857 if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) { 1858 t.Fatalf("Labels: %v, want: %v", got, want) 1859 } 1860 1861 // Update everything we can about the instance in one call. 1862 confWithClusters := &InstanceWithClustersConfig{ 1863 InstanceID: instanceToCreate, 1864 DisplayName: "new display name", 1865 InstanceType: PRODUCTION, 1866 Labels: map[string]string{"new-label-key": "new-label-value"}, 1867 Clusters: []ClusterConfig{ 1868 {ClusterID: clusterID, NumNodes: 5}, 1869 }, 1870 } 1871 1872 results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1873 if err != nil { 1874 t.Fatalf("UpdateInstanceAndSyncClusters: %v", err) 1875 } 1876 1877 wantResults := UpdateInstanceResults{ 1878 InstanceUpdated: true, 1879 UpdatedClusters: []string{clusterID}, 1880 } 1881 if diff := testutil.Diff(*results, wantResults); diff != "" { 1882 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1883 } 1884 1885 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1886 if err != nil { 1887 t.Fatalf("InstanceInfo: %v", err) 1888 } 1889 1890 if iInfo.InstanceType != PRODUCTION { 1891 t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType) 1892 } 1893 if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) { 1894 t.Fatalf("Labels: %v, want: %v", got, want) 1895 } 1896 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1897 t.Fatalf("Display name: %q, want: %q", got, want) 1898 } 1899 1900 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1901 if err != nil { 1902 t.Fatalf("GetCluster: %v", err) 1903 } 1904 1905 if cInfo.ServeNodes != 5 { 1906 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1907 } 1908 1909 // Now add a second cluster as the only change. The first cluster must also be provided so 1910 // it is not removed. 1911 clusterID2 := clusterID + "-2" 1912 confWithClusters = &InstanceWithClustersConfig{ 1913 InstanceID: instanceToCreate, 1914 Clusters: []ClusterConfig{ 1915 {ClusterID: clusterID}, 1916 {ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2}, 1917 }, 1918 } 1919 1920 results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1921 if err != nil { 1922 t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err) 1923 } 1924 1925 wantResults = UpdateInstanceResults{ 1926 InstanceUpdated: false, 1927 CreatedClusters: []string{clusterID2}, 1928 } 1929 if diff := testutil.Diff(*results, wantResults); diff != "" { 1930 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1931 } 1932 1933 // Now update one cluster and delete the other 1934 confWithClusters = &InstanceWithClustersConfig{ 1935 InstanceID: instanceToCreate, 1936 Clusters: []ClusterConfig{ 1937 {ClusterID: clusterID, NumNodes: 4}, 1938 }, 1939 } 1940 1941 results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1942 if err != nil { 1943 t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err) 1944 } 1945 1946 wantResults = UpdateInstanceResults{ 1947 InstanceUpdated: false, 1948 UpdatedClusters: []string{clusterID}, 1949 DeletedClusters: []string{clusterID2}, 1950 } 1951 1952 if diff := testutil.Diff(*results, wantResults); diff != "" { 1953 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1954 } 1955 1956 // Make sure the instance looks as we would expect 1957 clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId) 1958 if err != nil { 1959 t.Fatalf("Clusters: %v", err) 1960 } 1961 1962 if len(clusters) != 1 { 1963 t.Fatalf("Clusters length %v, want: 1", len(clusters)) 1964 } 1965 1966 wantCluster := &ClusterInfo{ 1967 Name: clusterID, 1968 Zone: instanceToCreateZone, 1969 ServeNodes: 4, 1970 State: "READY", 1971 } 1972 if diff := testutil.Diff(clusters[0], wantCluster); diff != "" { 1973 t.Fatalf("InstanceEquality: got - want +\n%s", diff) 1974 } 1975} 1976 1977// instanceAdminClientMock is used to test FailedLocations field processing. 1978type instanceAdminClientMock struct { 1979 Clusters []*btapb.Cluster 1980 UnavailableLocations []string 1981} 1982 1983func (iacm *instanceAdminClientMock) ListClusters(ctx context.Context, req *btapb.ListClustersRequest, opts ...grpc.CallOption) (*btapb.ListClustersResponse, error) { 1984 res := btapb.ListClustersResponse{ 1985 Clusters: iacm.Clusters, 1986 FailedLocations: iacm.UnavailableLocations, 1987 } 1988 return &res, nil 1989} 1990 1991func (iacm *instanceAdminClientMock) CreateInstance(ctx context.Context, in *btapb.CreateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) { 1992 return nil, nil 1993} 1994 1995func (iacm *instanceAdminClientMock) GetInstance(ctx context.Context, in *btapb.GetInstanceRequest, opts ...grpc.CallOption) (*btapb.Instance, error) { 1996 return nil, nil 1997} 1998 1999func (iacm *instanceAdminClientMock) ListInstances(ctx context.Context, in *btapb.ListInstancesRequest, opts ...grpc.CallOption) (*btapb.ListInstancesResponse, error) { 2000 return nil, nil 2001} 2002 2003func (iacm *instanceAdminClientMock) UpdateInstance(ctx context.Context, in *btapb.Instance, opts ...grpc.CallOption) (*btapb.Instance, error) { 2004 return nil, nil 2005} 2006 2007func (iacm *instanceAdminClientMock) PartialUpdateInstance(ctx context.Context, in *btapb.PartialUpdateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) { 2008 return nil, nil 2009} 2010 2011func (iacm *instanceAdminClientMock) DeleteInstance(ctx context.Context, in *btapb.DeleteInstanceRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { 2012 return nil, nil 2013} 2014 2015func (iacm *instanceAdminClientMock) CreateCluster(ctx context.Context, in *btapb.CreateClusterRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) { 2016 return nil, nil 2017} 2018 2019func (iacm *instanceAdminClientMock) GetCluster(ctx context.Context, in *btapb.GetClusterRequest, opts ...grpc.CallOption) (*btapb.Cluster, error) { 2020 return nil, nil 2021} 2022 2023func (iacm *instanceAdminClientMock) UpdateCluster(ctx context.Context, in *btapb.Cluster, opts ...grpc.CallOption) (*longrunning.Operation, error) { 2024 return nil, nil 2025} 2026 2027func (iacm *instanceAdminClientMock) DeleteCluster(ctx context.Context, in *btapb.DeleteClusterRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { 2028 return nil, nil 2029} 2030 2031func (iacm *instanceAdminClientMock) CreateAppProfile(ctx context.Context, in *btapb.CreateAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) { 2032 return nil, nil 2033} 2034 2035func (iacm *instanceAdminClientMock) GetAppProfile(ctx context.Context, in *btapb.GetAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) { 2036 return nil, nil 2037} 2038 2039func (iacm *instanceAdminClientMock) ListAppProfiles(ctx context.Context, in *btapb.ListAppProfilesRequest, opts ...grpc.CallOption) (*btapb.ListAppProfilesResponse, error) { 2040 return nil, nil 2041} 2042 2043func (iacm *instanceAdminClientMock) UpdateAppProfile(ctx context.Context, in *btapb.UpdateAppProfileRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) { 2044 return nil, nil 2045} 2046 2047func (iacm *instanceAdminClientMock) DeleteAppProfile(ctx context.Context, in *btapb.DeleteAppProfileRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { 2048 return nil, nil 2049} 2050 2051func (iacm *instanceAdminClientMock) GetIamPolicy(ctx context.Context, in *v1.GetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) { 2052 return nil, nil 2053} 2054 2055func (iacm *instanceAdminClientMock) SetIamPolicy(ctx context.Context, in *v1.SetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) { 2056 return nil, nil 2057} 2058 2059func (iacm *instanceAdminClientMock) TestIamPermissions(ctx context.Context, in *v1.TestIamPermissionsRequest, opts ...grpc.CallOption) (*v1.TestIamPermissionsResponse, error) { 2060 return nil, nil 2061} 2062 2063func TestIntegration_InstanceAdminClient_Clusters_WithFailedLocations(t *testing.T) { 2064 testEnv, err := NewIntegrationEnv() 2065 if err != nil { 2066 t.Fatalf("IntegrationEnv: %v", err) 2067 } 2068 defer testEnv.Close() 2069 2070 if !testEnv.Config().UseProd { 2071 t.Skip("emulator doesn't support snapshots") 2072 } 2073 2074 iAdminClient, err := testEnv.NewInstanceAdminClient() 2075 if err != nil { 2076 t.Fatalf("NewInstanceAdminClient: %v", err) 2077 } 2078 defer iAdminClient.Close() 2079 2080 cluster1 := btapb.Cluster{Name: "cluster1"} 2081 failedLoc := "euro1" 2082 2083 iAdminClient.iClient = &instanceAdminClientMock{ 2084 Clusters: []*btapb.Cluster{&cluster1}, 2085 UnavailableLocations: []string{failedLoc}, 2086 } 2087 2088 cis, err := iAdminClient.Clusters(context.Background(), "instance-id") 2089 convertedErr, ok := err.(ErrPartiallyUnavailable) 2090 if !ok { 2091 t.Fatalf("want error ErrPartiallyUnavailable, got other") 2092 } 2093 if got, want := len(convertedErr.Locations), 1; got != want { 2094 t.Fatalf("want %v failed locations, got %v", want, got) 2095 } 2096 if got, want := convertedErr.Locations[0], failedLoc; got != want { 2097 t.Fatalf("want failed location %v, got %v", want, got) 2098 } 2099 if got, want := len(cis), 1; got != want { 2100 t.Fatalf("want %v failed locations, got %v", want, got) 2101 } 2102 if got, want := cis[0].Name, cluster1.Name; got != want { 2103 t.Fatalf("want cluster %v, got %v", want, got) 2104 } 2105} 2106 2107func TestIntegration_Granularity(t *testing.T) { 2108 testEnv, err := NewIntegrationEnv() 2109 if err != nil { 2110 t.Fatalf("IntegrationEnv: %v", err) 2111 } 2112 defer testEnv.Close() 2113 2114 timeout := 2 * time.Second 2115 if testEnv.Config().UseProd { 2116 timeout = 5 * time.Minute 2117 } 2118 ctx, _ := context.WithTimeout(context.Background(), timeout) 2119 2120 adminClient, err := testEnv.NewAdminClient() 2121 if err != nil { 2122 t.Fatalf("NewAdminClient: %v", err) 2123 } 2124 defer adminClient.Close() 2125 2126 list := func() []string { 2127 tbls, err := adminClient.Tables(ctx) 2128 if err != nil { 2129 t.Fatalf("Fetching list of tables: %v", err) 2130 } 2131 sort.Strings(tbls) 2132 return tbls 2133 } 2134 containsAll := func(got, want []string) bool { 2135 gotSet := make(map[string]bool) 2136 2137 for _, s := range got { 2138 gotSet[s] = true 2139 } 2140 for _, s := range want { 2141 if !gotSet[s] { 2142 return false 2143 } 2144 } 2145 return true 2146 } 2147 2148 defer deleteTable(ctx, t, adminClient, myTableName) 2149 2150 if err := adminClient.CreateTable(ctx, myTableName); err != nil { 2151 t.Fatalf("Creating table: %v", err) 2152 } 2153 2154 tables := list() 2155 if got, want := tables, []string{myTableName}; !containsAll(got, want) { 2156 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 2157 } 2158 2159 // calling ModifyColumnFamilies to check the granularity of table 2160 prefix := adminClient.instancePrefix() 2161 req := &btapb.ModifyColumnFamiliesRequest{ 2162 Name: prefix + "/tables/" + myTableName, 2163 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 2164 Id: "cf", 2165 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}}, 2166 }}, 2167 } 2168 table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req) 2169 if err != nil { 2170 t.Fatalf("Creating column family: %v", err) 2171 } 2172 if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) { 2173 t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS)) 2174 } 2175} 2176 2177func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) { 2178 testEnv, err := NewIntegrationEnv() 2179 if err != nil { 2180 t.Fatalf("IntegrationEnv: %v", err) 2181 } 2182 defer testEnv.Close() 2183 2184 timeout := 2 * time.Second 2185 if testEnv.Config().UseProd { 2186 timeout = 5 * time.Minute 2187 } 2188 ctx, cancel := context.WithTimeout(context.Background(), timeout) 2189 defer cancel() 2190 2191 adminClient, err := testEnv.NewAdminClient() 2192 if err != nil { 2193 t.Fatalf("NewAdminClient: %v", err) 2194 } 2195 defer adminClient.Close() 2196 2197 iAdminClient, err := testEnv.NewInstanceAdminClient() 2198 if err != nil { 2199 t.Fatalf("NewInstanceAdminClient: %v", err) 2200 } 2201 2202 if iAdminClient == nil { 2203 return 2204 } 2205 2206 err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1") 2207 2208 defer iAdminClient.Close() 2209 profile := ProfileConf{ 2210 ProfileID: "app_profile1", 2211 InstanceID: adminClient.instance, 2212 ClusterID: testEnv.Config().Cluster, 2213 Description: "creating new app profile 1", 2214 RoutingPolicy: SingleClusterRouting, 2215 } 2216 2217 createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile) 2218 if err != nil { 2219 t.Fatalf("Creating app profile: %v", err) 2220 } 2221 2222 gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 2223 if err != nil { 2224 t.Fatalf("Get app profile: %v", err) 2225 } 2226 2227 if !proto.Equal(createdProfile, gotProfile) { 2228 t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name) 2229 } 2230 2231 list := func(instanceID string) ([]*btapb.AppProfile, error) { 2232 profiles := []*btapb.AppProfile(nil) 2233 2234 it := iAdminClient.ListAppProfiles(ctx, instanceID) 2235 for { 2236 s, err := it.Next() 2237 if err == iterator.Done { 2238 break 2239 } 2240 if err != nil { 2241 return nil, err 2242 } 2243 profiles = append(profiles, s) 2244 } 2245 return profiles, err 2246 } 2247 2248 profiles, err := list(adminClient.instance) 2249 if err != nil { 2250 t.Fatalf("List app profile: %v", err) 2251 } 2252 2253 // App Profile list should contain default, app_profile1 2254 if got, want := len(profiles), 2; got != want { 2255 t.Fatalf("Initial app profile list len: %d, want: %d", got, want) 2256 } 2257 2258 for _, test := range []struct { 2259 desc string 2260 uattrs ProfileAttrsToUpdate 2261 want *btapb.AppProfile // nil means error 2262 }{ 2263 { 2264 desc: "empty update", 2265 uattrs: ProfileAttrsToUpdate{}, 2266 want: nil, 2267 }, 2268 2269 { 2270 desc: "empty description update", 2271 uattrs: ProfileAttrsToUpdate{Description: ""}, 2272 want: &btapb.AppProfile{ 2273 Name: gotProfile.Name, 2274 Description: "", 2275 RoutingPolicy: gotProfile.RoutingPolicy, 2276 Etag: gotProfile.Etag, 2277 }, 2278 }, 2279 { 2280 desc: "routing update", 2281 uattrs: ProfileAttrsToUpdate{ 2282 RoutingPolicy: SingleClusterRouting, 2283 ClusterID: testEnv.Config().Cluster, 2284 }, 2285 want: &btapb.AppProfile{ 2286 Name: gotProfile.Name, 2287 Description: "", 2288 Etag: gotProfile.Etag, 2289 RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{ 2290 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 2291 ClusterId: testEnv.Config().Cluster, 2292 }, 2293 }, 2294 }, 2295 }, 2296 } { 2297 err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs) 2298 if err != nil { 2299 if test.want != nil { 2300 t.Errorf("%s: %v", test.desc, err) 2301 } 2302 continue 2303 } 2304 if err == nil && test.want == nil { 2305 t.Errorf("%s: got nil, want error", test.desc) 2306 continue 2307 } 2308 2309 got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 2310 2311 if !proto.Equal(got, test.want) { 2312 t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want) 2313 } 2314 2315 } 2316 2317 err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1") 2318 if err != nil { 2319 t.Fatalf("Delete app profile: %v", err) 2320 } 2321} 2322 2323func TestIntegration_InstanceUpdate(t *testing.T) { 2324 testEnv, err := NewIntegrationEnv() 2325 if err != nil { 2326 t.Fatalf("IntegrationEnv: %v", err) 2327 } 2328 defer testEnv.Close() 2329 2330 timeout := 2 * time.Second 2331 if testEnv.Config().UseProd { 2332 timeout = 5 * time.Minute 2333 } 2334 ctx, cancel := context.WithTimeout(context.Background(), timeout) 2335 defer cancel() 2336 2337 adminClient, err := testEnv.NewAdminClient() 2338 if err != nil { 2339 t.Fatalf("NewAdminClient: %v", err) 2340 } 2341 2342 defer adminClient.Close() 2343 2344 iAdminClient, err := testEnv.NewInstanceAdminClient() 2345 if err != nil { 2346 t.Fatalf("NewInstanceAdminClient: %v", err) 2347 } 2348 2349 if iAdminClient == nil { 2350 return 2351 } 2352 2353 defer iAdminClient.Close() 2354 2355 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 2356 if err != nil { 2357 t.Errorf("InstanceInfo: %v", err) 2358 } 2359 if iInfo.Name != adminClient.instance { 2360 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 2361 } 2362 2363 if iInfo.DisplayName != adminClient.instance { 2364 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.DisplayName, adminClient.instance) 2365 } 2366 2367 const numNodes = 4 2368 // update cluster nodes 2369 if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil { 2370 t.Errorf("UpdateCluster: %v", err) 2371 } 2372 2373 // get cluster after updating 2374 cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster) 2375 if err != nil { 2376 t.Errorf("GetCluster %v", err) 2377 } 2378 if cis.ServeNodes != int(numNodes) { 2379 t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes)) 2380 } 2381} 2382 2383func TestIntegration_AdminBackup(t *testing.T) { 2384 testEnv, err := NewIntegrationEnv() 2385 if err != nil { 2386 t.Fatalf("IntegrationEnv: %v", err) 2387 } 2388 defer testEnv.Close() 2389 2390 if !testEnv.Config().UseProd { 2391 t.Skip("emulator doesn't support backups") 2392 } 2393 2394 timeout := 5 * time.Minute 2395 ctx, _ := context.WithTimeout(context.Background(), timeout) 2396 2397 adminClient, err := testEnv.NewAdminClient() 2398 if err != nil { 2399 t.Fatalf("NewAdminClient: %v", err) 2400 } 2401 defer adminClient.Close() 2402 2403 tblConf := TableConf{ 2404 TableID: testEnv.Config().Table, 2405 Families: map[string]GCPolicy{ 2406 "fam1": MaxVersionsPolicy(1), 2407 "fam2": MaxVersionsPolicy(2), 2408 }, 2409 } 2410 if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil { 2411 t.Fatalf("Creating table from TableConf: %v", err) 2412 } 2413 // Delete the table at the end of the test. Schedule ahead of time 2414 // in case the client fails 2415 defer deleteTable(ctx, t, adminClient, tblConf.TableID) 2416 2417 sourceInstance := testEnv.Config().Instance 2418 sourceCluster := testEnv.Config().Cluster 2419 2420 iAdminClient, err := testEnv.NewInstanceAdminClient() 2421 if err != nil { 2422 t.Fatalf("NewInstanceAdminClient: %v", err) 2423 } 2424 defer iAdminClient.Close() 2425 diffInstance := testEnv.Config().Instance + "-d" 2426 diffCluster := sourceCluster + "-d" 2427 conf := &InstanceConf{ 2428 InstanceId: diffInstance, 2429 ClusterId: diffCluster, 2430 DisplayName: "different test sourceInstance", 2431 Zone: instanceToCreateZone2, 2432 InstanceType: DEVELOPMENT, 2433 Labels: map[string]string{"test-label-key": "test-label-value"}, 2434 } 2435 defer iAdminClient.DeleteInstance(ctx, diffInstance) 2436 // Create different instance to restore table. 2437 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 2438 t.Errorf("CreateInstance: %v", err) 2439 } 2440 2441 list := func(cluster string) ([]*BackupInfo, error) { 2442 infos := []*BackupInfo(nil) 2443 2444 it := adminClient.Backups(ctx, cluster) 2445 for { 2446 s, err := it.Next() 2447 if err == iterator.Done { 2448 break 2449 } 2450 if err != nil { 2451 return nil, err 2452 } 2453 infos = append(infos, s) 2454 } 2455 return infos, err 2456 } 2457 2458 // Create backup 2459 uniqueID := make([]byte, 8) 2460 _, err = rand.Read(uniqueID) 2461 if err != nil { 2462 t.Fatalf("Failed to generate a unique ID: %v", err) 2463 } 2464 2465 backupName := fmt.Sprintf("mybackup-%x", uniqueID) 2466 defer adminClient.DeleteBackup(ctx, sourceCluster, backupName) 2467 2468 if err = adminClient.CreateBackup(ctx, tblConf.TableID, sourceCluster, backupName, time.Now().Add(8*time.Hour)); err != nil { 2469 t.Fatalf("Creating backup: %v", err) 2470 } 2471 2472 // List backup 2473 backups, err := list(sourceCluster) 2474 if err != nil { 2475 t.Fatalf("Listing backups: %v", err) 2476 } 2477 if got, want := len(backups), 1; got < want { 2478 t.Fatalf("Listing backup count: %d, want: >= %d", got, want) 2479 } 2480 2481 foundBackup := false 2482 for _, backup := range backups { 2483 if backup.Name == backupName { 2484 foundBackup = true 2485 2486 if got, want := backup.SourceTable, tblConf.TableID; got != want { 2487 t.Errorf("Backup SourceTable: %s, want: %s", got, want) 2488 } 2489 if got, want := backup.ExpireTime, backup.StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { 2490 t.Errorf("Backup ExpireTime: %s, want: %s", got, want) 2491 } 2492 2493 break 2494 } 2495 } 2496 2497 if !foundBackup { 2498 t.Errorf("Backup not found: %v", backupName) 2499 } 2500 2501 // Get backup 2502 backup, err := adminClient.BackupInfo(ctx, sourceCluster, backupName) 2503 if err != nil { 2504 t.Fatalf("BackupInfo: %v", backup) 2505 } 2506 if got, want := *backup, *backups[0]; cmp.Equal(got, &want) { 2507 t.Errorf("BackupInfo: %v, want: %v", got, want) 2508 } 2509 2510 // Update backup 2511 newExpireTime := time.Now().Add(10 * time.Hour) 2512 err = adminClient.UpdateBackup(ctx, sourceCluster, backupName, newExpireTime) 2513 if err != nil { 2514 t.Fatalf("UpdateBackup failed: %v", err) 2515 } 2516 2517 // Check that updated backup has the correct expire time 2518 updatedBackup, err := adminClient.BackupInfo(ctx, sourceCluster, backupName) 2519 if err != nil { 2520 t.Fatalf("BackupInfo: %v", err) 2521 } 2522 backup.ExpireTime = newExpireTime 2523 // Server clock and local clock may not be perfectly sync'ed. 2524 if got, want := *updatedBackup, *backup; got.ExpireTime.Sub(want.ExpireTime) > time.Minute { 2525 t.Errorf("BackupInfo: %v, want: %v", got, want) 2526 } 2527 2528 // Restore backup 2529 restoredTable := tblConf.TableID + "-restored" 2530 defer deleteTable(ctx, t, adminClient, restoredTable) 2531 if err = adminClient.RestoreTable(ctx, restoredTable, sourceCluster, backupName); err != nil { 2532 t.Fatalf("RestoreTable: %v", err) 2533 } 2534 if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { 2535 t.Fatalf("Restored TableInfo: %v", err) 2536 } 2537 // Restore backup to different instance 2538 restoreTableName := tblConf.TableID + "-diff-restored" 2539 diffConf := IntegrationTestConfig{ 2540 Project: testEnv.Config().Project, 2541 Instance: diffInstance, 2542 Cluster: diffCluster, 2543 Table: restoreTableName, 2544 } 2545 env := &ProdEnv{ 2546 config: diffConf, 2547 } 2548 dAdminClient, err := env.NewAdminClient() 2549 if err != nil { 2550 t.Errorf("NewAdminClient: %v", err) 2551 } 2552 defer dAdminClient.Close() 2553 2554 defer deleteTable(ctx, t, dAdminClient, restoreTableName) 2555 if err = dAdminClient.RestoreTableFrom(ctx, sourceInstance, restoreTableName, sourceCluster, backupName); err != nil { 2556 t.Fatalf("RestoreTableFrom: %v", err) 2557 } 2558 tblInfo, err := dAdminClient.TableInfo(ctx, restoreTableName) 2559 if err != nil { 2560 t.Fatalf("Restored to different sourceInstance failed, TableInfo: %v", err) 2561 } 2562 families := tblInfo.Families 2563 sort.Strings(tblInfo.Families) 2564 wantFams := []string{"fam1", "fam2"} 2565 if !testutil.Equal(families, wantFams) { 2566 t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams) 2567 } 2568 2569 // Delete backup 2570 if err = adminClient.DeleteBackup(ctx, sourceCluster, backupName); err != nil { 2571 t.Fatalf("DeleteBackup: %v", err) 2572 } 2573 backups, err = list(sourceCluster) 2574 if err != nil { 2575 t.Fatalf("List after Delete: %v", err) 2576 } 2577 if got, want := len(backups), 0; got != want { 2578 t.Errorf("List after delete len: %d, want: %d", got, want) 2579 } 2580} 2581 2582// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed. 2583func TestIntegration_DirectPathFallback(t *testing.T) { 2584 ctx := context.Background() 2585 testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 2586 if err != nil { 2587 t.Fatal(err) 2588 } 2589 defer cleanup() 2590 2591 if !testEnv.Config().AttemptDirectPath { 2592 t.Skip() 2593 } 2594 2595 if len(blackholeDpv6Cmd) == 0 { 2596 t.Fatal("-it.blackhole-dpv6-cmd unset") 2597 } 2598 if len(blackholeDpv4Cmd) == 0 { 2599 t.Fatal("-it.blackhole-dpv4-cmd unset") 2600 } 2601 if len(allowDpv6Cmd) == 0 { 2602 t.Fatal("-it.allowdpv6-cmd unset") 2603 } 2604 if len(allowDpv4Cmd) == 0 { 2605 t.Fatal("-it.allowdpv4-cmd unset") 2606 } 2607 2608 if err := populatePresidentsGraph(table); err != nil { 2609 t.Fatal(err) 2610 } 2611 2612 // Precondition: wait for DirectPath to connect. 2613 dpEnabled := examineTraffic(ctx, testEnv, table, false) 2614 if !dpEnabled { 2615 t.Fatalf("Failed to observe RPCs over DirectPath") 2616 } 2617 2618 // Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. 2619 blackholeDirectPath(testEnv, t) 2620 dpDisabled := examineTraffic(ctx, testEnv, table, true) 2621 if !dpDisabled { 2622 t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") 2623 } 2624 2625 // Disable the blackhole, and client should use DirectPath again. 2626 allowDirectPath(testEnv, t) 2627 dpEnabled = examineTraffic(ctx, testEnv, table, false) 2628 if !dpEnabled { 2629 t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") 2630 } 2631} 2632 2633// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true). 2634func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool { 2635 numCount := 0 2636 const ( 2637 numRPCsToSend = 20 2638 minCompleteRPC = 40 2639 ) 2640 2641 start := time.Now() 2642 for time.Since(start) < 2*time.Minute { 2643 for i := 0; i < numRPCsToSend; i++ { 2644 _, _ = table.ReadRow(ctx, "j§adams") 2645 if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP { 2646 numCount++ 2647 if numCount >= minCompleteRPC { 2648 return true 2649 } 2650 } 2651 time.Sleep(100 * time.Millisecond) 2652 } 2653 } 2654 return false 2655} 2656 2657// Retries a function if it has an unavailable code for up to 30s using a backoff 2658func retryOnUnavailable(ctx context.Context, inner func() error) error { 2659 err := internal.Retry(ctx, gax.Backoff{Initial: 100 * time.Millisecond}, func() (stop bool, err error) { 2660 err = inner() 2661 if err == nil { 2662 return true, nil 2663 } 2664 s, ok := status.FromError(err) 2665 if !ok { 2666 return false, err 2667 } 2668 // Retry on Unavailable 2669 return s.Code() != codes.Unavailable, err 2670 }) 2671 return err 2672} 2673 2674func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { 2675 testEnv, err := NewIntegrationEnv() 2676 if err != nil { 2677 return nil, nil, nil, nil, "", nil, err 2678 } 2679 2680 var timeout time.Duration 2681 if testEnv.Config().UseProd { 2682 timeout = 10 * time.Minute 2683 t.Logf("Running test against production") 2684 } else { 2685 timeout = 5 * time.Minute 2686 t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint) 2687 } 2688 ctx, cancel := context.WithTimeout(ctx, timeout) 2689 2690 client, err := testEnv.NewClient() 2691 if err != nil { 2692 t.Logf("Error creating client: %v", err) 2693 return nil, nil, nil, nil, "", nil, err 2694 } 2695 2696 adminClient, err := testEnv.NewAdminClient() 2697 if err != nil { 2698 t.Logf("Error creating admin client: %v", err) 2699 2700 return nil, nil, nil, nil, "", nil, err 2701 } 2702 2703 if testEnv.Config().UseProd { 2704 // TODO: tables may not be successfully deleted in some cases, and will 2705 // become obsolete. We may need a way to automatically delete them. 2706 tableName = tableNameSpace.New() 2707 } else { 2708 tableName = testEnv.Config().Table 2709 } 2710 2711 if err := adminClient.CreateTable(ctx, tableName); err != nil { 2712 cancel() 2713 t.Logf("Error creating table: %v", err) 2714 return nil, nil, nil, nil, "", nil, err 2715 } 2716 2717 err = retryOnUnavailable(ctx, func() error { 2718 return adminClient.CreateColumnFamily(ctx, tableName, "follows") 2719 }) 2720 if err != nil { 2721 cancel() 2722 t.Logf("Error creating column family: %v", err) 2723 return nil, nil, nil, nil, "", nil, err 2724 } 2725 2726 return testEnv, client, adminClient, client.Open(tableName), tableName, func() { 2727 if err := adminClient.DeleteTable(ctx, tableName); err != nil { 2728 t.Errorf("DeleteTable got error %v", err) 2729 } 2730 cancel() 2731 client.Close() 2732 adminClient.Close() 2733 }, nil 2734} 2735 2736func formatReadItem(ri ReadItem) string { 2737 // Use the column qualifier only to make the test data briefer. 2738 col := ri.Column[strings.Index(ri.Column, ":")+1:] 2739 return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value) 2740} 2741 2742func fill(b, sub []byte) { 2743 for len(b) > len(sub) { 2744 n := copy(b, sub) 2745 b = b[n:] 2746 } 2747} 2748 2749func clearTimestamps(r Row) { 2750 for _, ris := range r { 2751 for i := range ris { 2752 ris[i].Timestamp = 0 2753 } 2754 } 2755} 2756 2757func deleteTable(ctx context.Context, t *testing.T, ac *AdminClient, name string) { 2758 bo := gax.Backoff{ 2759 Initial: 100 * time.Millisecond, 2760 Max: 2 * time.Second, 2761 Multiplier: 1.2, 2762 } 2763 ctx, _ = context.WithTimeout(ctx, time.Second*30) 2764 2765 err := internal.Retry(ctx, bo, func() (bool, error) { 2766 err := ac.DeleteTable(ctx, name) 2767 if err != nil { 2768 return false, err 2769 } 2770 return true, nil 2771 }) 2772 if err != nil { 2773 t.Logf("DeleteTable: %v", err) 2774 } 2775} 2776 2777func verifyDirectPathRemoteAddress(testEnv IntegrationEnv, t *testing.T) { 2778 t.Helper() 2779 if !testEnv.Config().AttemptDirectPath { 2780 return 2781 } 2782 if remoteIP, res := isDirectPathRemoteAddress(testEnv); !res { 2783 if testEnv.Config().DirectPathIPV4Only { 2784 t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP) 2785 } else { 2786 t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP) 2787 } 2788 } 2789} 2790 2791func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { 2792 remoteIP := testEnv.Peer().Addr.String() 2793 // DirectPath ipv4-only can only use ipv4 traffic. 2794 if testEnv.Config().DirectPathIPV4Only { 2795 return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) 2796 } 2797 // DirectPath ipv6 can use either ipv4 or ipv6 traffic. 2798 return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) 2799} 2800 2801func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) { 2802 cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) 2803 out, _ := cmdRes.CombinedOutput() 2804 t.Logf(string(out)) 2805 if testEnv.Config().DirectPathIPV4Only { 2806 return 2807 } 2808 cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) 2809 out, _ = cmdRes.CombinedOutput() 2810 t.Logf(string(out)) 2811} 2812 2813func allowDirectPath(testEnv IntegrationEnv, t *testing.T) { 2814 cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) 2815 out, _ := cmdRes.CombinedOutput() 2816 t.Logf(string(out)) 2817 if testEnv.Config().DirectPathIPV4Only { 2818 return 2819 } 2820 cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) 2821 out, _ = cmdRes.CombinedOutput() 2822 t.Logf(string(out)) 2823} 2824