1/* 2Copyright 2019 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "context" 21 "flag" 22 "fmt" 23 "math" 24 "math/rand" 25 "os/exec" 26 "reflect" 27 "sort" 28 "strings" 29 "sync" 30 "testing" 31 "time" 32 33 "cloud.google.com/go/iam" 34 "cloud.google.com/go/internal" 35 "cloud.google.com/go/internal/testutil" 36 "cloud.google.com/go/internal/uid" 37 "github.com/golang/protobuf/proto" 38 "github.com/google/go-cmp/cmp" 39 gax "github.com/googleapis/gax-go/v2" 40 "google.golang.org/api/iterator" 41 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 42 v1 "google.golang.org/genproto/googleapis/iam/v1" 43 longrunning "google.golang.org/genproto/googleapis/longrunning" 44 grpc "google.golang.org/grpc" 45 "google.golang.org/protobuf/types/known/emptypb" 46) 47 48const ( 49 directPathIPV6Prefix = "[2001:4860:8040" 50 directPathIPV4Prefix = "34.126" 51) 52 53var ( 54 presidentsSocialGraph = map[string][]string{ 55 "wmckinley": {"tjefferson"}, 56 "gwashington": {"j§adams"}, 57 "tjefferson": {"gwashington", "j§adams"}, 58 "j§adams": {"gwashington", "tjefferson"}, 59 } 60 61 tableNameSpace = uid.NewSpace("cbt-test", &uid.Options{Short: true}) 62) 63 64func populatePresidentsGraph(table *Table) error { 65 ctx := context.Background() 66 for row, ss := range presidentsSocialGraph { 67 mut := NewMutation() 68 for _, name := range ss { 69 mut.Set("follows", name, 1000, []byte("1")) 70 } 71 if err := table.Apply(ctx, row, mut); err != nil { 72 return fmt.Errorf("Mutating row %q: %v", row, err) 73 } 74 } 75 return nil 76} 77 78var ( 79 instanceToCreate string 80 instanceToCreateZone string 81 instanceToCreateZone2 string 82 blackholeDpv6Cmd string 83 blackholeDpv4Cmd string 84 allowDpv6Cmd string 85 allowDpv4Cmd string 86) 87 88func init() { 89 // Don't test instance creation by default, as quota is necessary and aborted tests could strand resources. 90 flag.StringVar(&instanceToCreate, "it.instance-to-create", "", 91 "The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.") 92 flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b", 93 "The zone in which to create the new test instance.") 94 flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c", 95 "The zone in which to create a second cluster in the test instance.") 96 // Use sysctl or iptables to blackhole DirectPath IP for fallback tests. 97 flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6") 98 flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4") 99 flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6") 100 flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4") 101} 102 103func TestIntegration_ConditionalMutations(t *testing.T) { 104 ctx := context.Background() 105 testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 106 if err != nil { 107 t.Fatal(err) 108 } 109 defer cleanup() 110 111 if err := populatePresidentsGraph(table); err != nil { 112 t.Fatal(err) 113 } 114 115 // Do a conditional mutation with a complex filter. 116 mutTrue := NewMutation() 117 mutTrue.Set("follows", "wmckinley", 1000, []byte("1")) 118 filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter(".")) 119 mut := NewCondMutation(filter, mutTrue, nil) 120 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 121 t.Fatalf("Conditionally mutating row: %v", err) 122 } 123 verifyDirectPathRemoteAddress(testEnv, t) 124 // Do a second condition mutation with a filter that does not match, 125 // and thus no changes should be made. 126 mutTrue = NewMutation() 127 mutTrue.DeleteRow() 128 filter = ColumnFilter("snoop.dogg") 129 mut = NewCondMutation(filter, mutTrue, nil) 130 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 131 t.Fatalf("Conditionally mutating row: %v", err) 132 } 133 verifyDirectPathRemoteAddress(testEnv, t) 134 135 // Fetch a row. 136 row, err := table.ReadRow(ctx, "j§adams") 137 if err != nil { 138 t.Fatalf("Reading a row: %v", err) 139 } 140 verifyDirectPathRemoteAddress(testEnv, t) 141 wantRow := Row{ 142 "follows": []ReadItem{ 143 {Row: "j§adams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")}, 144 {Row: "j§adams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")}, 145 }, 146 } 147 if !testutil.Equal(row, wantRow) { 148 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 149 } 150} 151 152func TestIntegration_PartialReadRows(t *testing.T) { 153 ctx := context.Background() 154 _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 155 if err != nil { 156 t.Fatal(err) 157 } 158 defer cleanup() 159 160 if err := populatePresidentsGraph(table); err != nil { 161 t.Fatal(err) 162 } 163 164 // Do a scan and stop part way through. 165 // Verify that the ReadRows callback doesn't keep running. 166 stopped := false 167 err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { 168 if r.Key() < "h" { 169 return true 170 } 171 if !stopped { 172 stopped = true 173 return false 174 } 175 t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key()) 176 return false 177 }) 178 if err != nil { 179 t.Fatalf("Partial ReadRows: %v", err) 180 } 181} 182 183func TestIntegration_ReadRowList(t *testing.T) { 184 ctx := context.Background() 185 _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 186 if err != nil { 187 t.Fatal(err) 188 } 189 defer cleanup() 190 191 if err := populatePresidentsGraph(table); err != nil { 192 t.Fatal(err) 193 } 194 195 // Read a RowList 196 var elt []string 197 keys := RowList{"wmckinley", "gwashington", "j§adams"} 198 want := "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,wmckinley-tjefferson-1" 199 err = table.ReadRows(ctx, keys, func(r Row) bool { 200 for _, ris := range r { 201 for _, ri := range ris { 202 elt = append(elt, formatReadItem(ri)) 203 } 204 } 205 return true 206 }) 207 if err != nil { 208 t.Fatalf("read RowList: %v", err) 209 } 210 211 if got := strings.Join(elt, ","); got != want { 212 t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) 213 } 214} 215 216func TestIntegration_DeleteRow(t *testing.T) { 217 ctx := context.Background() 218 _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 219 if err != nil { 220 t.Fatal(err) 221 } 222 defer cleanup() 223 224 if err := populatePresidentsGraph(table); err != nil { 225 t.Fatal(err) 226 } 227 228 // Delete a row and check it goes away. 229 mut := NewMutation() 230 mut.DeleteRow() 231 if err := table.Apply(ctx, "wmckinley", mut); err != nil { 232 t.Fatalf("Apply DeleteRow: %v", err) 233 } 234 row, err := table.ReadRow(ctx, "wmckinley") 235 if err != nil { 236 t.Fatalf("Reading a row after DeleteRow: %v", err) 237 } 238 if len(row) != 0 { 239 t.Fatalf("Read non-zero row after DeleteRow: %v", row) 240 } 241} 242 243func TestIntegration_ReadModifyWrite(t *testing.T) { 244 ctx := context.Background() 245 testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 246 if err != nil { 247 t.Fatal(err) 248 } 249 defer cleanup() 250 251 if err := populatePresidentsGraph(table); err != nil { 252 t.Fatal(err) 253 } 254 255 if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil { 256 t.Fatalf("Creating column family: %v", err) 257 } 258 259 appendRMW := func(b []byte) *ReadModifyWrite { 260 rmw := NewReadModifyWrite() 261 rmw.AppendValue("counter", "likes", b) 262 return rmw 263 } 264 incRMW := func(n int64) *ReadModifyWrite { 265 rmw := NewReadModifyWrite() 266 rmw.Increment("counter", "likes", n) 267 return rmw 268 } 269 rmwSeq := []struct { 270 desc string 271 rmw *ReadModifyWrite 272 want []byte 273 }{ 274 { 275 desc: "append #1", 276 rmw: appendRMW([]byte{0, 0, 0}), 277 want: []byte{0, 0, 0}, 278 }, 279 { 280 desc: "append #2", 281 rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17 282 want: []byte{0, 0, 0, 0, 0, 0, 0, 17}, 283 }, 284 { 285 desc: "increment", 286 rmw: incRMW(8), 287 want: []byte{0, 0, 0, 0, 0, 0, 0, 25}, 288 }, 289 } 290 for _, step := range rmwSeq { 291 row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw) 292 if err != nil { 293 t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) 294 } 295 verifyDirectPathRemoteAddress(testEnv, t) 296 // Make sure the modified cell returned by the RMW operation has a timestamp. 297 if row["counter"][0].Timestamp == 0 { 298 t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp) 299 } 300 clearTimestamps(row) 301 wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}} 302 if !testutil.Equal(row, wantRow) { 303 t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow) 304 } 305 } 306 307 // Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator. 308 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0})) 309 if err != nil { 310 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 311 } 312 verifyDirectPathRemoteAddress(testEnv, t) 313 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0})) 314 if err != nil { 315 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 316 } 317 verifyDirectPathRemoteAddress(testEnv, t) 318 // Get only the correct row back on read. 319 r, err := table.ReadRow(ctx, "issue-723-1") 320 if err != nil { 321 t.Fatalf("Reading row: %v", err) 322 } 323 verifyDirectPathRemoteAddress(testEnv, t) 324 if r.Key() != "issue-723-1" { 325 t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1") 326 } 327} 328 329func TestIntegration_ArbitraryTimestamps(t *testing.T) { 330 ctx := context.Background() 331 _, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 332 if err != nil { 333 t.Fatal(err) 334 } 335 defer cleanup() 336 337 // Test arbitrary timestamps more thoroughly. 338 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 339 t.Fatalf("Creating column family: %v", err) 340 } 341 const numVersions = 4 342 mut := NewMutation() 343 for i := 1; i < numVersions; i++ { 344 // Timestamps are used in thousands because the server 345 // only permits that granularity. 346 mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 347 mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 348 } 349 if err := table.Apply(ctx, "testrow", mut); err != nil { 350 t.Fatalf("Mutating row: %v", err) 351 } 352 r, err := table.ReadRow(ctx, "testrow") 353 if err != nil { 354 t.Fatalf("Reading row: %v", err) 355 } 356 wantRow := Row{"ts": []ReadItem{ 357 // These should be returned in descending timestamp order. 358 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 359 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 360 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 361 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 362 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 363 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 364 }} 365 if !testutil.Equal(r, wantRow) { 366 t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow) 367 } 368 369 // Do the same read, but filter to the latest two versions. 370 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 371 if err != nil { 372 t.Fatalf("Reading row: %v", err) 373 } 374 wantRow = Row{"ts": []ReadItem{ 375 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 376 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 377 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 378 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 379 }} 380 if !testutil.Equal(r, wantRow) { 381 t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow) 382 } 383 // Check cell offset / limit 384 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3))) 385 if err != nil { 386 t.Fatalf("Reading row: %v", err) 387 } 388 wantRow = Row{"ts": []ReadItem{ 389 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 390 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 391 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 392 }} 393 if !testutil.Equal(r, wantRow) { 394 t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow) 395 } 396 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3))) 397 if err != nil { 398 t.Fatalf("Reading row: %v", err) 399 } 400 wantRow = Row{"ts": []ReadItem{ 401 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 402 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 403 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 404 }} 405 if !testutil.Equal(r, wantRow) { 406 t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow) 407 } 408 // Check timestamp range filtering (with truncation) 409 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000))) 410 if err != nil { 411 t.Fatalf("Reading row: %v", err) 412 } 413 wantRow = Row{"ts": []ReadItem{ 414 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 415 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 416 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 417 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 418 }} 419 if !testutil.Equal(r, wantRow) { 420 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow) 421 } 422 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0))) 423 if err != nil { 424 t.Fatalf("Reading row: %v", err) 425 } 426 wantRow = Row{"ts": []ReadItem{ 427 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 428 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 429 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 430 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 431 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 432 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 433 }} 434 if !testutil.Equal(r, wantRow) { 435 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow) 436 } 437 // Delete non-existing cells, no such column family in this row 438 // Should not delete anything 439 if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil { 440 t.Fatalf("Creating column family: %v", err) 441 } 442 mut = NewMutation() 443 mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval 444 if err := table.Apply(ctx, "testrow", mut); err != nil { 445 t.Fatalf("Mutating row: %v", err) 446 } 447 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 448 if err != nil { 449 t.Fatalf("Reading row: %v", err) 450 } 451 if !testutil.Equal(r, wantRow) { 452 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 453 } 454 // Delete non-existing cells, no such column in this column family 455 // Should not delete anything 456 mut = NewMutation() 457 mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval 458 if err := table.Apply(ctx, "testrow", mut); err != nil { 459 t.Fatalf("Mutating row: %v", err) 460 } 461 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 462 if err != nil { 463 t.Fatalf("Reading row: %v", err) 464 } 465 if !testutil.Equal(r, wantRow) { 466 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 467 } 468 // Delete the cell with timestamp 2000 and repeat the last read, 469 // checking that we get ts 3000 and ts 1000. 470 mut = NewMutation() 471 mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval 472 if err := table.Apply(ctx, "testrow", mut); err != nil { 473 t.Fatalf("Mutating row: %v", err) 474 } 475 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 476 if err != nil { 477 t.Fatalf("Reading row: %v", err) 478 } 479 wantRow = Row{"ts": []ReadItem{ 480 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 481 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 482 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 483 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 484 }} 485 if !testutil.Equal(r, wantRow) { 486 t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow) 487 } 488 489 // Check DeleteCellsInFamily 490 if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil { 491 t.Fatalf("Creating column family: %v", err) 492 } 493 494 mut = NewMutation() 495 mut.Set("status", "start", 2000, []byte("2")) 496 mut.Set("status", "end", 3000, []byte("3")) 497 mut.Set("ts", "col", 1000, []byte("1")) 498 if err := table.Apply(ctx, "row1", mut); err != nil { 499 t.Fatalf("Mutating row: %v", err) 500 } 501 if err := table.Apply(ctx, "row2", mut); err != nil { 502 t.Fatalf("Mutating row: %v", err) 503 } 504 505 mut = NewMutation() 506 mut.DeleteCellsInFamily("status") 507 if err := table.Apply(ctx, "row1", mut); err != nil { 508 t.Fatalf("Delete cf: %v", err) 509 } 510 511 // ColumnFamily removed 512 r, err = table.ReadRow(ctx, "row1") 513 if err != nil { 514 t.Fatalf("Reading row: %v", err) 515 } 516 wantRow = Row{"ts": []ReadItem{ 517 {Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 518 }} 519 if !testutil.Equal(r, wantRow) { 520 t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow) 521 } 522 523 // ColumnFamily not removed 524 r, err = table.ReadRow(ctx, "row2") 525 if err != nil { 526 t.Fatalf("Reading row: %v", err) 527 } 528 wantRow = Row{ 529 "ts": []ReadItem{ 530 {Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 531 }, 532 "status": []ReadItem{ 533 {Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")}, 534 {Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 535 }, 536 } 537 if !testutil.Equal(r, wantRow) { 538 t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow) 539 } 540 541 // Check DeleteCellsInColumn 542 mut = NewMutation() 543 mut.Set("status", "start", 2000, []byte("2")) 544 mut.Set("status", "middle", 3000, []byte("3")) 545 mut.Set("status", "end", 1000, []byte("1")) 546 if err := table.Apply(ctx, "row3", mut); err != nil { 547 t.Fatalf("Mutating row: %v", err) 548 } 549 mut = NewMutation() 550 mut.DeleteCellsInColumn("status", "middle") 551 if err := table.Apply(ctx, "row3", mut); err != nil { 552 t.Fatalf("Delete column: %v", err) 553 } 554 r, err = table.ReadRow(ctx, "row3") 555 if err != nil { 556 t.Fatalf("Reading row: %v", err) 557 } 558 wantRow = Row{ 559 "status": []ReadItem{ 560 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 561 {Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 562 }, 563 } 564 if !testutil.Equal(r, wantRow) { 565 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 566 } 567 mut = NewMutation() 568 mut.DeleteCellsInColumn("status", "start") 569 if err := table.Apply(ctx, "row3", mut); err != nil { 570 t.Fatalf("Delete column: %v", err) 571 } 572 r, err = table.ReadRow(ctx, "row3") 573 if err != nil { 574 t.Fatalf("Reading row: %v", err) 575 } 576 wantRow = Row{ 577 "status": []ReadItem{ 578 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 579 }, 580 } 581 if !testutil.Equal(r, wantRow) { 582 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 583 } 584 mut = NewMutation() 585 mut.DeleteCellsInColumn("status", "end") 586 if err := table.Apply(ctx, "row3", mut); err != nil { 587 t.Fatalf("Delete column: %v", err) 588 } 589 r, err = table.ReadRow(ctx, "row3") 590 if err != nil { 591 t.Fatalf("Reading row: %v", err) 592 } 593 if len(r) != 0 { 594 t.Fatalf("Delete column: got %v, want empty row", r) 595 } 596 // Add same cell after delete 597 mut = NewMutation() 598 mut.Set("status", "end", 1000, []byte("1")) 599 if err := table.Apply(ctx, "row3", mut); err != nil { 600 t.Fatalf("Mutating row: %v", err) 601 } 602 r, err = table.ReadRow(ctx, "row3") 603 if err != nil { 604 t.Fatalf("Reading row: %v", err) 605 } 606 if !testutil.Equal(r, wantRow) { 607 t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow) 608 } 609} 610 611func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { 612 ctx := context.Background() 613 _, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 614 if err != nil { 615 t.Fatal(err) 616 } 617 defer cleanup() 618 619 if err := populatePresidentsGraph(table); err != nil { 620 t.Fatal(err) 621 } 622 623 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 624 t.Fatalf("Creating column family: %v", err) 625 } 626 627 // Do highly concurrent reads/writes. 628 const maxConcurrency = 1000 629 var wg sync.WaitGroup 630 for i := 0; i < maxConcurrency; i++ { 631 wg.Add(1) 632 go func() { 633 defer wg.Done() 634 switch r := rand.Intn(100); { // r ∈ [0,100) 635 case 0 <= r && r < 30: 636 // Do a read. 637 _, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1))) 638 if err != nil { 639 t.Errorf("Concurrent read: %v", err) 640 } 641 case 30 <= r && r < 100: 642 // Do a write. 643 mut := NewMutation() 644 mut.Set("ts", "col", 1000, []byte("data")) 645 if err := table.Apply(ctx, "testrow", mut); err != nil { 646 t.Errorf("Concurrent write: %v", err) 647 } 648 } 649 }() 650 } 651 wg.Wait() 652} 653 654func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { 655 ctx := context.Background() 656 testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 657 if err != nil { 658 t.Fatal(err) 659 } 660 defer cleanup() 661 662 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 663 t.Fatalf("Creating column family: %v", err) 664 } 665 666 bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set. 667 nonsense := []byte("lorem ipsum dolor sit amet, ") 668 fill(bigBytes, nonsense) 669 mut := NewMutation() 670 mut.Set("ts", "col", 1000, bigBytes) 671 if err := table.Apply(ctx, "bigrow", mut); err != nil { 672 t.Fatalf("Big write: %v", err) 673 } 674 verifyDirectPathRemoteAddress(testEnv, t) 675 r, err := table.ReadRow(ctx, "bigrow") 676 if err != nil { 677 t.Fatalf("Big read: %v", err) 678 } 679 verifyDirectPathRemoteAddress(testEnv, t) 680 wantRow := Row{"ts": []ReadItem{ 681 {Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes}, 682 }} 683 if !testutil.Equal(r, wantRow) { 684 t.Fatalf("Big read returned incorrect bytes: %v", r) 685 } 686 687 var wg sync.WaitGroup 688 // Now write 1000 rows, each with 82 KB values, then scan them all. 689 medBytes := make([]byte, 82<<10) 690 fill(medBytes, nonsense) 691 sem := make(chan int, 50) // do up to 50 mutations at a time. 692 for i := 0; i < 1000; i++ { 693 mut := NewMutation() 694 mut.Set("ts", "big-scan", 1000, medBytes) 695 row := fmt.Sprintf("row-%d", i) 696 wg.Add(1) 697 go func() { 698 defer wg.Done() 699 defer func() { <-sem }() 700 sem <- 1 701 if err := table.Apply(ctx, row, mut); err != nil { 702 t.Errorf("Preparing large scan: %v", err) 703 } 704 verifyDirectPathRemoteAddress(testEnv, t) 705 }() 706 } 707 wg.Wait() 708 n := 0 709 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 710 for _, ris := range r { 711 for _, ri := range ris { 712 n += len(ri.Value) 713 } 714 } 715 return true 716 }, RowFilter(ColumnFilter("big-scan"))) 717 if err != nil { 718 t.Fatalf("Doing large scan: %v", err) 719 } 720 verifyDirectPathRemoteAddress(testEnv, t) 721 if want := 1000 * len(medBytes); n != want { 722 t.Fatalf("Large scan returned %d bytes, want %d", n, want) 723 } 724 // Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption. 725 rc := 0 726 wantRc := 3 727 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 728 rc++ 729 return true 730 }, LimitRows(int64(wantRc))) 731 if err != nil { 732 t.Fatal(err) 733 } 734 verifyDirectPathRemoteAddress(testEnv, t) 735 if rc != wantRc { 736 t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) 737 } 738 739 // Test bulk mutations 740 if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil { 741 t.Fatalf("Creating column family: %v", err) 742 } 743 bulkData := map[string][]string{ 744 "red sox": {"2004", "2007", "2013"}, 745 "patriots": {"2001", "2003", "2004", "2014"}, 746 "celtics": {"1981", "1984", "1986", "2008"}, 747 } 748 var rowKeys []string 749 var muts []*Mutation 750 for row, ss := range bulkData { 751 mut := NewMutation() 752 for _, name := range ss { 753 mut.Set("bulk", name, 1000, []byte("1")) 754 } 755 rowKeys = append(rowKeys, row) 756 muts = append(muts, mut) 757 } 758 status, err := table.ApplyBulk(ctx, rowKeys, muts) 759 if err != nil { 760 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 761 } 762 verifyDirectPathRemoteAddress(testEnv, t) 763 if status != nil { 764 t.Fatalf("non-nil errors: %v", err) 765 } 766 767 // Read each row back 768 for rowKey, ss := range bulkData { 769 row, err := table.ReadRow(ctx, rowKey) 770 if err != nil { 771 t.Fatalf("Reading a bulk row: %v", err) 772 } 773 verifyDirectPathRemoteAddress(testEnv, t) 774 var wantItems []ReadItem 775 for _, val := range ss { 776 wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")}) 777 } 778 wantRow := Row{"bulk": wantItems} 779 if !testutil.Equal(row, wantRow) { 780 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 781 } 782 } 783 784 // Test bulk write errors. 785 // Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error. 786 badMut := NewMutation() 787 badMut.Set("badfamily", "col", ServerTime, nil) 788 badMut2 := NewMutation() 789 badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1")) 790 status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2}) 791 if err != nil { 792 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 793 } 794 verifyDirectPathRemoteAddress(testEnv, t) 795 if status == nil { 796 t.Fatalf("No errors for bad bulk mutation") 797 } else if status[0] == nil || status[1] == nil { 798 t.Fatalf("No error for bad bulk mutation") 799 } 800} 801 802func TestIntegration_Read(t *testing.T) { 803 ctx := context.Background() 804 testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 805 if err != nil { 806 t.Fatal(err) 807 } 808 defer cleanup() 809 810 // Insert some data. 811 initialData := map[string][]string{ 812 "wmckinley": {"tjefferson"}, 813 "gwashington": {"j§adams"}, 814 "tjefferson": {"gwashington", "j§adams", "wmckinley"}, 815 "j§adams": {"gwashington", "tjefferson"}, 816 } 817 for row, ss := range initialData { 818 mut := NewMutation() 819 for _, name := range ss { 820 mut.Set("follows", name, 1000, []byte("1")) 821 } 822 if err := table.Apply(ctx, row, mut); err != nil { 823 t.Fatalf("Mutating row %q: %v", row, err) 824 } 825 verifyDirectPathRemoteAddress(testEnv, t) 826 } 827 828 for _, test := range []struct { 829 desc string 830 rr RowSet 831 filter Filter // may be nil 832 limit ReadOption // may be nil 833 834 // We do the read, grab all the cells, turn them into "<row>-<col>-<val>", 835 // and join with a comma. 836 want string 837 wantLabels []string 838 }{ 839 { 840 desc: "read all, unfiltered", 841 rr: RowRange{}, 842 want: "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 843 }, 844 { 845 desc: "read with InfiniteRange, unfiltered", 846 rr: InfiniteRange("tjefferson"), 847 want: "tjefferson-gwashington-1,tjefferson-j§adams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 848 }, 849 { 850 desc: "read with NewRange, unfiltered", 851 rr: NewRange("gargamel", "hubbard"), 852 want: "gwashington-j§adams-1", 853 }, 854 { 855 desc: "read with PrefixRange, unfiltered", 856 rr: PrefixRange("j§ad"), 857 want: "j§adams-gwashington-1,j§adams-tjefferson-1", 858 }, 859 { 860 desc: "read with SingleRow, unfiltered", 861 rr: SingleRow("wmckinley"), 862 want: "wmckinley-tjefferson-1", 863 }, 864 { 865 desc: "read all, with ColumnFilter", 866 rr: RowRange{}, 867 filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson" 868 want: "gwashington-j§adams-1,j§adams-tjefferson-1,tjefferson-j§adams-1,wmckinley-tjefferson-1", 869 }, 870 { 871 desc: "read all, with ColumnFilter, prefix", 872 rr: RowRange{}, 873 filter: ColumnFilter("j"), // no matches 874 want: "", 875 }, 876 { 877 desc: "read range, with ColumnRangeFilter", 878 rr: RowRange{}, 879 filter: ColumnRangeFilter("follows", "h", "k"), 880 want: "gwashington-j§adams-1,tjefferson-j§adams-1", 881 }, 882 { 883 desc: "read range from empty, with ColumnRangeFilter", 884 rr: RowRange{}, 885 filter: ColumnRangeFilter("follows", "", "u"), 886 want: "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,wmckinley-tjefferson-1", 887 }, 888 { 889 desc: "read range from start to empty, with ColumnRangeFilter", 890 rr: RowRange{}, 891 filter: ColumnRangeFilter("follows", "h", ""), 892 want: "gwashington-j§adams-1,j§adams-tjefferson-1,tjefferson-j§adams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 893 }, 894 { 895 desc: "read with RowKeyFilter", 896 rr: RowRange{}, 897 filter: RowKeyFilter(".*wash.*"), 898 want: "gwashington-j§adams-1", 899 }, 900 { 901 desc: "read with RowKeyFilter unicode", 902 rr: RowRange{}, 903 filter: RowKeyFilter(".*j§.*"), 904 want: "j§adams-gwashington-1,j§adams-tjefferson-1", 905 }, 906 { 907 desc: "read with RowKeyFilter escaped", 908 rr: RowRange{}, 909 filter: RowKeyFilter(`.*j\xC2\xA7.*`), 910 want: "j§adams-gwashington-1,j§adams-tjefferson-1", 911 }, 912 { 913 desc: "read with RowKeyFilter, prefix", 914 rr: RowRange{}, 915 filter: RowKeyFilter("gwash"), 916 want: "", 917 }, 918 { 919 desc: "read with RowKeyFilter, no matches", 920 rr: RowRange{}, 921 filter: RowKeyFilter(".*xxx.*"), 922 want: "", 923 }, 924 { 925 desc: "read with FamilyFilter, no matches", 926 rr: RowRange{}, 927 filter: FamilyFilter(".*xxx.*"), 928 want: "", 929 }, 930 { 931 desc: "read with ColumnFilter + row limit", 932 rr: RowRange{}, 933 filter: ColumnFilter(".*j.*"), // matches "j§adams" and "tjefferson" 934 limit: LimitRows(2), 935 want: "gwashington-j§adams-1,j§adams-tjefferson-1", 936 }, 937 { 938 desc: "apply labels to the result rows", 939 rr: RowRange{}, 940 filter: LabelFilter("test-label"), 941 limit: LimitRows(2), 942 want: "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1", 943 wantLabels: []string{"test-label", "test-label", "test-label"}, 944 }, 945 { 946 desc: "read all, strip values", 947 rr: RowRange{}, 948 filter: StripValueFilter(), 949 want: "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 950 }, 951 { 952 desc: "read with ColumnFilter + row limit + strip values", 953 rr: RowRange{}, 954 filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "j§adams" and "tjefferson" 955 limit: LimitRows(2), 956 want: "gwashington-j§adams-,j§adams-tjefferson-", 957 }, 958 { 959 desc: "read with condition, strip values on true", 960 rr: RowRange{}, 961 filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil), 962 want: "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 963 }, 964 { 965 desc: "read with condition, strip values on false", 966 rr: RowRange{}, 967 filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()), 968 want: "gwashington-j§adams-,j§adams-gwashington-,j§adams-tjefferson-,tjefferson-gwashington-,tjefferson-j§adams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 969 }, 970 { 971 desc: "read with ValueRangeFilter + row limit", 972 rr: RowRange{}, 973 filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1" 974 limit: LimitRows(2), 975 want: "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1", 976 }, 977 { 978 desc: "read with ValueRangeFilter, no match on exclusive end", 979 rr: RowRange{}, 980 filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match 981 want: "", 982 }, 983 { 984 desc: "read with ValueRangeFilter, no matches", 985 rr: RowRange{}, 986 filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing 987 want: "", 988 }, 989 { 990 desc: "read with InterleaveFilter, no matches on all filters", 991 rr: RowRange{}, 992 filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")), 993 want: "", 994 }, 995 { 996 desc: "read with InterleaveFilter, no duplicate cells", 997 rr: RowRange{}, 998 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")), 999 want: "gwashington-j§adams-1,j§adams-gwashington-1,j§adams-tjefferson-1,tjefferson-gwashington-1,tjefferson-j§adams-1,wmckinley-tjefferson-1", 1000 }, 1001 { 1002 desc: "read with InterleaveFilter, with duplicate cells", 1003 rr: RowRange{}, 1004 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")), 1005 want: "j§adams-gwashington-1,j§adams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1", 1006 }, 1007 { 1008 desc: "read with a RowRangeList and no filter", 1009 rr: RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")}, 1010 want: "gwashington-j§adams-1,wmckinley-tjefferson-1", 1011 }, 1012 { 1013 desc: "chain that excludes rows and matches nothing, in a condition", 1014 rr: RowRange{}, 1015 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil), 1016 want: "", 1017 }, 1018 { 1019 desc: "chain that ends with an interleave that has no match. covers #804", 1020 rr: RowRange{}, 1021 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil), 1022 want: "", 1023 }, 1024 } { 1025 t.Run(test.desc, func(t *testing.T) { 1026 var opts []ReadOption 1027 if test.filter != nil { 1028 opts = append(opts, RowFilter(test.filter)) 1029 } 1030 if test.limit != nil { 1031 opts = append(opts, test.limit) 1032 } 1033 var elt, labels []string 1034 err := table.ReadRows(ctx, test.rr, func(r Row) bool { 1035 for _, ris := range r { 1036 for _, ri := range ris { 1037 labels = append(labels, ri.Labels...) 1038 elt = append(elt, formatReadItem(ri)) 1039 } 1040 } 1041 return true 1042 }, opts...) 1043 if err != nil { 1044 t.Fatal(err) 1045 } 1046 verifyDirectPathRemoteAddress(testEnv, t) 1047 if got := strings.Join(elt, ","); got != test.want { 1048 t.Fatalf("got %q\nwant %q", got, test.want) 1049 } 1050 if got, want := labels, test.wantLabels; !reflect.DeepEqual(got, want) { 1051 t.Fatalf("got %q\nwant %q", got, want) 1052 } 1053 }) 1054 } 1055} 1056 1057func TestIntegration_SampleRowKeys(t *testing.T) { 1058 ctx := context.Background() 1059 testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 1060 if err != nil { 1061 t.Fatal(err) 1062 } 1063 defer cleanup() 1064 1065 // Insert some data. 1066 initialData := map[string][]string{ 1067 "wmckinley11": {"tjefferson11"}, 1068 "gwashington77": {"j§adams77"}, 1069 "tjefferson0": {"gwashington0", "j§adams0"}, 1070 } 1071 1072 for row, ss := range initialData { 1073 mut := NewMutation() 1074 for _, name := range ss { 1075 mut.Set("follows", name, 1000, []byte("1")) 1076 } 1077 if err := table.Apply(ctx, row, mut); err != nil { 1078 t.Fatalf("Mutating row %q: %v", row, err) 1079 } 1080 verifyDirectPathRemoteAddress(testEnv, t) 1081 } 1082 sampleKeys, err := table.SampleRowKeys(context.Background()) 1083 if err != nil { 1084 t.Fatalf("%s: %v", "SampleRowKeys:", err) 1085 } 1086 if len(sampleKeys) == 0 { 1087 t.Error("SampleRowKeys length 0") 1088 } 1089} 1090 1091func TestIntegration_Admin(t *testing.T) { 1092 testEnv, err := NewIntegrationEnv() 1093 if err != nil { 1094 t.Fatalf("IntegrationEnv: %v", err) 1095 } 1096 defer testEnv.Close() 1097 1098 timeout := 2 * time.Second 1099 if testEnv.Config().UseProd { 1100 timeout = 5 * time.Minute 1101 } 1102 ctx, _ := context.WithTimeout(context.Background(), timeout) 1103 1104 adminClient, err := testEnv.NewAdminClient() 1105 if err != nil { 1106 t.Fatalf("NewAdminClient: %v", err) 1107 } 1108 defer adminClient.Close() 1109 1110 iAdminClient, err := testEnv.NewInstanceAdminClient() 1111 if err != nil { 1112 t.Fatalf("NewInstanceAdminClient: %v", err) 1113 } 1114 if iAdminClient != nil { 1115 defer iAdminClient.Close() 1116 1117 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1118 if err != nil { 1119 t.Errorf("InstanceInfo: %v", err) 1120 } 1121 if iInfo.Name != adminClient.instance { 1122 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1123 } 1124 } 1125 1126 list := func() []string { 1127 tbls, err := adminClient.Tables(ctx) 1128 if err != nil { 1129 t.Fatalf("Fetching list of tables: %v", err) 1130 } 1131 sort.Strings(tbls) 1132 return tbls 1133 } 1134 containsAll := func(got, want []string) bool { 1135 gotSet := make(map[string]bool) 1136 1137 for _, s := range got { 1138 gotSet[s] = true 1139 } 1140 for _, s := range want { 1141 if !gotSet[s] { 1142 return false 1143 } 1144 } 1145 return true 1146 } 1147 1148 defer deleteTable(ctx, t, adminClient, "mytable") 1149 1150 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1151 t.Fatalf("Creating table: %v", err) 1152 } 1153 1154 defer deleteTable(ctx, t, adminClient, "myothertable") 1155 1156 if err := adminClient.CreateTable(ctx, "myothertable"); err != nil { 1157 t.Fatalf("Creating table: %v", err) 1158 } 1159 1160 if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) { 1161 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1162 } 1163 1164 must(adminClient.WaitForReplication(ctx, "mytable")) 1165 1166 if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil { 1167 t.Fatalf("Deleting table: %v", err) 1168 } 1169 tables := list() 1170 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1171 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1172 } 1173 if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) { 1174 t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted) 1175 } 1176 1177 tblConf := TableConf{ 1178 TableID: "conftable", 1179 Families: map[string]GCPolicy{ 1180 "fam1": MaxVersionsPolicy(1), 1181 "fam2": MaxVersionsPolicy(2), 1182 }, 1183 } 1184 if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil { 1185 t.Fatalf("Creating table from TableConf: %v", err) 1186 } 1187 defer deleteTable(ctx, t, adminClient, tblConf.TableID) 1188 1189 tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID) 1190 if err != nil { 1191 t.Fatalf("Getting table info: %v", err) 1192 } 1193 sort.Strings(tblInfo.Families) 1194 wantFams := []string{"fam1", "fam2"} 1195 if !testutil.Equal(tblInfo.Families, wantFams) { 1196 t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams) 1197 } 1198 1199 // Populate mytable and drop row ranges 1200 if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil { 1201 t.Fatalf("Creating column family: %v", err) 1202 } 1203 1204 client, err := testEnv.NewClient() 1205 if err != nil { 1206 t.Fatalf("NewClient: %v", err) 1207 } 1208 defer client.Close() 1209 1210 tbl := client.Open("mytable") 1211 1212 prefixes := []string{"a", "b", "c"} 1213 for _, prefix := range prefixes { 1214 for i := 0; i < 5; i++ { 1215 mut := NewMutation() 1216 mut.Set("cf", "col", 1000, []byte("1")) 1217 if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil { 1218 t.Fatalf("Mutating row: %v", err) 1219 } 1220 } 1221 } 1222 1223 if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil { 1224 t.Errorf("DropRowRange a: %v", err) 1225 } 1226 if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil { 1227 t.Errorf("DropRowRange c: %v", err) 1228 } 1229 if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil { 1230 t.Errorf("DropRowRange x: %v", err) 1231 } 1232 1233 var gotRowCount int 1234 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1235 gotRowCount++ 1236 if !strings.HasPrefix(row.Key(), "b") { 1237 t.Errorf("Invalid row after dropping range: %v", row) 1238 } 1239 return true 1240 })) 1241 if gotRowCount != 5 { 1242 t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5) 1243 } 1244 1245 if err = adminClient.DropAllRows(ctx, "mytable"); err != nil { 1246 t.Errorf("DropAllRows mytable: %v", err) 1247 } 1248 1249 gotRowCount = 0 1250 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1251 gotRowCount++ 1252 return true 1253 })) 1254 if gotRowCount != 0 { 1255 t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0) 1256 } 1257} 1258 1259func TestIntegration_TableIam(t *testing.T) { 1260 testEnv, err := NewIntegrationEnv() 1261 if err != nil { 1262 t.Fatalf("IntegrationEnv: %v", err) 1263 } 1264 defer testEnv.Close() 1265 1266 if !testEnv.Config().UseProd { 1267 t.Skip("emulator doesn't support IAM Policy creation") 1268 } 1269 1270 timeout := 5 * time.Minute 1271 ctx, _ := context.WithTimeout(context.Background(), timeout) 1272 1273 adminClient, err := testEnv.NewAdminClient() 1274 if err != nil { 1275 t.Fatalf("NewAdminClient: %v", err) 1276 } 1277 defer adminClient.Close() 1278 1279 defer deleteTable(ctx, t, adminClient, "mytable") 1280 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1281 t.Fatalf("Creating table: %v", err) 1282 } 1283 1284 // Verify that the IAM Controls work for Tables. 1285 iamHandle := adminClient.TableIAM("mytable") 1286 p, err := iamHandle.Policy(ctx) 1287 if err != nil { 1288 t.Errorf("Iam GetPolicy mytable: %v", err) 1289 } 1290 if err = iamHandle.SetPolicy(ctx, p); err != nil { 1291 t.Errorf("Iam SetPolicy mytable: %v", err) 1292 } 1293 if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil { 1294 t.Errorf("Iam TestPermissions mytable: %v", err) 1295 } 1296} 1297 1298func TestIntegration_BackupIAM(t *testing.T) { 1299 testEnv, err := NewIntegrationEnv() 1300 if err != nil { 1301 t.Fatalf("IntegrationEnv: %v", err) 1302 } 1303 defer testEnv.Close() 1304 1305 if !testEnv.Config().UseProd { 1306 t.Skip("emulator doesn't support IAM Policy creation") 1307 } 1308 timeout := 5 * time.Minute 1309 ctx, _ := context.WithTimeout(context.Background(), timeout) 1310 1311 adminClient, err := testEnv.NewAdminClient() 1312 if err != nil { 1313 t.Fatalf("NewAdminClient: %v", err) 1314 } 1315 defer adminClient.Close() 1316 1317 table := testEnv.Config().Table 1318 cluster := testEnv.Config().Cluster 1319 1320 defer deleteTable(ctx, t, adminClient, table) 1321 if err := adminClient.CreateTable(ctx, table); err != nil { 1322 t.Fatalf("Creating table: %v", err) 1323 } 1324 // Create backup. 1325 backup := "backup" 1326 defer adminClient.DeleteBackup(ctx, cluster, backup) 1327 if err = adminClient.CreateBackup(ctx, table, cluster, backup, time.Now().Add(8*time.Hour)); err != nil { 1328 t.Fatalf("Creating backup: %v", err) 1329 } 1330 iamHandle := adminClient.BackupIAM(cluster, backup) 1331 // Get backup policy. 1332 p, err := iamHandle.Policy(ctx) 1333 if err != nil { 1334 t.Errorf("iamHandle.Policy: %v", err) 1335 } 1336 // The resource is new, so the policy should be empty. 1337 if got := p.Roles(); len(got) > 0 { 1338 t.Errorf("got roles %v, want none", got) 1339 } 1340 // Set backup policy. 1341 member := "domain:google.com" 1342 // Add a member, set the policy, then check that the member is present. 1343 p.Add(member, iam.Viewer) 1344 if err = iamHandle.SetPolicy(ctx, p); err != nil { 1345 t.Errorf("iamHandle.SetPolicy: %v", err) 1346 } 1347 p, err = iamHandle.Policy(ctx) 1348 if err != nil { 1349 t.Errorf("iamHandle.Policy: %v", err) 1350 } 1351 if got, want := p.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) { 1352 t.Errorf("iamHandle.Policy: got %v, want %v", got, want) 1353 } 1354 // Test backup permissions. 1355 permissions := []string{"bigtable.backups.get", "bigtable.backups.update"} 1356 _, err = iamHandle.TestPermissions(ctx, permissions) 1357 if err != nil { 1358 t.Errorf("iamHandle.TestPermissions: %v", err) 1359 } 1360} 1361 1362func TestIntegration_AdminCreateInstance(t *testing.T) { 1363 if instanceToCreate == "" { 1364 t.Skip("instanceToCreate not set, skipping instance creation testing") 1365 } 1366 1367 testEnv, err := NewIntegrationEnv() 1368 if err != nil { 1369 t.Fatalf("IntegrationEnv: %v", err) 1370 } 1371 defer testEnv.Close() 1372 1373 if !testEnv.Config().UseProd { 1374 t.Skip("emulator doesn't support instance creation") 1375 } 1376 1377 timeout := 5 * time.Minute 1378 ctx, _ := context.WithTimeout(context.Background(), timeout) 1379 1380 iAdminClient, err := testEnv.NewInstanceAdminClient() 1381 if err != nil { 1382 t.Fatalf("NewInstanceAdminClient: %v", err) 1383 } 1384 defer iAdminClient.Close() 1385 1386 clusterID := instanceToCreate + "-cluster" 1387 1388 // Create a development instance 1389 conf := &InstanceConf{ 1390 InstanceId: instanceToCreate, 1391 ClusterId: clusterID, 1392 DisplayName: "test instance", 1393 Zone: instanceToCreateZone, 1394 InstanceType: DEVELOPMENT, 1395 Labels: map[string]string{"test-label-key": "test-label-value"}, 1396 } 1397 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1398 t.Fatalf("CreateInstance: %v", err) 1399 } 1400 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1401 1402 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1403 if err != nil { 1404 t.Fatalf("InstanceInfo: %v", err) 1405 } 1406 1407 // Basic return values are tested elsewhere, check instance type 1408 if iInfo.InstanceType != DEVELOPMENT { 1409 t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType) 1410 } 1411 if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) { 1412 t.Fatalf("Labels: %v, want: %v", got, want) 1413 } 1414 1415 // Update everything we can about the instance in one call. 1416 confWithClusters := &InstanceWithClustersConfig{ 1417 InstanceID: instanceToCreate, 1418 DisplayName: "new display name", 1419 InstanceType: PRODUCTION, 1420 Labels: map[string]string{"new-label-key": "new-label-value"}, 1421 Clusters: []ClusterConfig{ 1422 {ClusterID: clusterID, NumNodes: 5}, 1423 }, 1424 } 1425 1426 if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { 1427 t.Fatalf("UpdateInstanceWithClusters: %v", err) 1428 } 1429 1430 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1431 if err != nil { 1432 t.Fatalf("InstanceInfo: %v", err) 1433 } 1434 1435 if iInfo.InstanceType != PRODUCTION { 1436 t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType) 1437 } 1438 if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) { 1439 t.Fatalf("Labels: %v, want: %v", got, want) 1440 } 1441 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1442 t.Fatalf("Display name: %q, want: %q", got, want) 1443 } 1444 1445 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1446 if err != nil { 1447 t.Fatalf("GetCluster: %v", err) 1448 } 1449 1450 if cInfo.ServeNodes != 5 { 1451 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1452 } 1453} 1454 1455func TestIntegration_AdminUpdateInstanceLabels(t *testing.T) { 1456 // Check the environments 1457 if instanceToCreate == "" { 1458 t.Skip("instanceToCreate not set, skipping instance creation testing") 1459 } 1460 testEnv, err := NewIntegrationEnv() 1461 if err != nil { 1462 t.Fatalf("IntegrationEnv: %v", err) 1463 } 1464 defer testEnv.Close() 1465 if !testEnv.Config().UseProd { 1466 t.Skip("emulator doesn't support instance creation") 1467 } 1468 1469 // Create an instance admin client 1470 timeout := 5 * time.Minute 1471 ctx, _ := context.WithTimeout(context.Background(), timeout) 1472 iAdminClient, err := testEnv.NewInstanceAdminClient() 1473 if err != nil { 1474 t.Fatalf("NewInstanceAdminClient: %v", err) 1475 } 1476 defer iAdminClient.Close() 1477 1478 // Create a test instance 1479 conf := &InstanceConf{ 1480 InstanceId: instanceToCreate, 1481 ClusterId: instanceToCreate + "-cluster", 1482 DisplayName: "test instance", 1483 InstanceType: DEVELOPMENT, 1484 Zone: instanceToCreateZone, 1485 } 1486 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1487 t.Fatalf("CreateInstance: %v", err) 1488 } 1489 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1490 1491 // Check the created test instances 1492 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1493 if err != nil { 1494 t.Fatalf("InstanceInfo: %v", err) 1495 } 1496 if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) { 1497 t.Fatalf("Labels: %v, want: %v", got, want) 1498 } 1499 1500 // Test patterns to update instance labels 1501 tests := []struct { 1502 name string 1503 in map[string]string 1504 out map[string]string 1505 }{ 1506 { 1507 name: "update labels", 1508 in: map[string]string{"test-label-key": "test-label-value"}, 1509 out: map[string]string{"test-label-key": "test-label-value"}, 1510 }, 1511 { 1512 name: "update multiple labels", 1513 in: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"}, 1514 out: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"}, 1515 }, 1516 { 1517 name: "not update existing labels", 1518 in: nil, // nil map 1519 out: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"}, 1520 }, 1521 { 1522 name: "delete labels", 1523 in: map[string]string{}, // empty map 1524 out: nil, 1525 }, 1526 } 1527 for _, tt := range tests { 1528 t.Run(tt.name, func(t *testing.T) { 1529 confWithClusters := &InstanceWithClustersConfig{ 1530 InstanceID: instanceToCreate, 1531 Labels: tt.in, 1532 } 1533 if err := iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { 1534 t.Fatalf("UpdateInstanceWithClusters: %v", err) 1535 } 1536 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1537 if err != nil { 1538 t.Fatalf("InstanceInfo: %v", err) 1539 } 1540 if got, want := iInfo.Labels, tt.out; !cmp.Equal(got, want) { 1541 t.Fatalf("Labels: %v, want: %v", got, want) 1542 } 1543 }) 1544 } 1545} 1546 1547func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) { 1548 if instanceToCreate == "" { 1549 t.Skip("instanceToCreate not set, skipping instance update testing") 1550 } 1551 1552 testEnv, err := NewIntegrationEnv() 1553 if err != nil { 1554 t.Fatalf("IntegrationEnv: %v", err) 1555 } 1556 defer testEnv.Close() 1557 1558 if !testEnv.Config().UseProd { 1559 t.Skip("emulator doesn't support instance creation") 1560 } 1561 1562 timeout := 5 * time.Minute 1563 ctx, _ := context.WithTimeout(context.Background(), timeout) 1564 1565 iAdminClient, err := testEnv.NewInstanceAdminClient() 1566 if err != nil { 1567 t.Fatalf("NewInstanceAdminClient: %v", err) 1568 } 1569 defer iAdminClient.Close() 1570 1571 clusterID := instanceToCreate + "-cluster" 1572 1573 // Create a development instance 1574 conf := &InstanceConf{ 1575 InstanceId: instanceToCreate, 1576 ClusterId: clusterID, 1577 DisplayName: "test instance", 1578 Zone: instanceToCreateZone, 1579 InstanceType: DEVELOPMENT, 1580 Labels: map[string]string{"test-label-key": "test-label-value"}, 1581 } 1582 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1583 t.Fatalf("CreateInstance: %v", err) 1584 } 1585 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1586 1587 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1588 if err != nil { 1589 t.Fatalf("InstanceInfo: %v", err) 1590 } 1591 1592 // Basic return values are tested elsewhere, check instance type 1593 if iInfo.InstanceType != DEVELOPMENT { 1594 t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType) 1595 } 1596 if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) { 1597 t.Fatalf("Labels: %v, want: %v", got, want) 1598 } 1599 1600 // Update everything we can about the instance in one call. 1601 confWithClusters := &InstanceWithClustersConfig{ 1602 InstanceID: instanceToCreate, 1603 DisplayName: "new display name", 1604 InstanceType: PRODUCTION, 1605 Labels: map[string]string{"new-label-key": "new-label-value"}, 1606 Clusters: []ClusterConfig{ 1607 {ClusterID: clusterID, NumNodes: 5}, 1608 }, 1609 } 1610 1611 results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1612 if err != nil { 1613 t.Fatalf("UpdateInstanceAndSyncClusters: %v", err) 1614 } 1615 1616 wantResults := UpdateInstanceResults{ 1617 InstanceUpdated: true, 1618 UpdatedClusters: []string{clusterID}, 1619 } 1620 if diff := testutil.Diff(*results, wantResults); diff != "" { 1621 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1622 } 1623 1624 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1625 if err != nil { 1626 t.Fatalf("InstanceInfo: %v", err) 1627 } 1628 1629 if iInfo.InstanceType != PRODUCTION { 1630 t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType) 1631 } 1632 if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) { 1633 t.Fatalf("Labels: %v, want: %v", got, want) 1634 } 1635 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1636 t.Fatalf("Display name: %q, want: %q", got, want) 1637 } 1638 1639 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1640 if err != nil { 1641 t.Fatalf("GetCluster: %v", err) 1642 } 1643 1644 if cInfo.ServeNodes != 5 { 1645 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1646 } 1647 1648 // Now add a second cluster as the only change. The first cluster must also be provided so 1649 // it is not removed. 1650 clusterID2 := clusterID + "-2" 1651 confWithClusters = &InstanceWithClustersConfig{ 1652 InstanceID: instanceToCreate, 1653 Clusters: []ClusterConfig{ 1654 {ClusterID: clusterID}, 1655 {ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2}, 1656 }, 1657 } 1658 1659 results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1660 if err != nil { 1661 t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err) 1662 } 1663 1664 wantResults = UpdateInstanceResults{ 1665 InstanceUpdated: false, 1666 CreatedClusters: []string{clusterID2}, 1667 } 1668 if diff := testutil.Diff(*results, wantResults); diff != "" { 1669 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1670 } 1671 1672 // Now update one cluster and delete the other 1673 confWithClusters = &InstanceWithClustersConfig{ 1674 InstanceID: instanceToCreate, 1675 Clusters: []ClusterConfig{ 1676 {ClusterID: clusterID, NumNodes: 4}, 1677 }, 1678 } 1679 1680 results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1681 if err != nil { 1682 t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err) 1683 } 1684 1685 wantResults = UpdateInstanceResults{ 1686 InstanceUpdated: false, 1687 UpdatedClusters: []string{clusterID}, 1688 DeletedClusters: []string{clusterID2}, 1689 } 1690 1691 if diff := testutil.Diff(*results, wantResults); diff != "" { 1692 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1693 } 1694 1695 // Make sure the instance looks as we would expect 1696 clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId) 1697 if err != nil { 1698 t.Fatalf("Clusters: %v", err) 1699 } 1700 1701 if len(clusters) != 1 { 1702 t.Fatalf("Clusters length %v, want: 1", len(clusters)) 1703 } 1704 1705 wantCluster := &ClusterInfo{ 1706 Name: clusterID, 1707 Zone: instanceToCreateZone, 1708 ServeNodes: 4, 1709 State: "READY", 1710 } 1711 if diff := testutil.Diff(clusters[0], wantCluster); diff != "" { 1712 t.Fatalf("InstanceEquality: got - want +\n%s", diff) 1713 } 1714} 1715 1716func TestIntegration_AdminSnapshot(t *testing.T) { 1717 testEnv, err := NewIntegrationEnv() 1718 if err != nil { 1719 t.Fatalf("IntegrationEnv: %v", err) 1720 } 1721 defer testEnv.Close() 1722 1723 if !testEnv.Config().UseProd { 1724 t.Skip("emulator doesn't support snapshots") 1725 } 1726 1727 timeout := 2 * time.Second 1728 if testEnv.Config().UseProd { 1729 timeout = 5 * time.Minute 1730 } 1731 ctx, _ := context.WithTimeout(context.Background(), timeout) 1732 1733 adminClient, err := testEnv.NewAdminClient() 1734 if err != nil { 1735 t.Fatalf("NewAdminClient: %v", err) 1736 } 1737 defer adminClient.Close() 1738 1739 table := testEnv.Config().Table 1740 cluster := testEnv.Config().Cluster 1741 1742 list := func(cluster string) ([]*SnapshotInfo, error) { 1743 infos := []*SnapshotInfo(nil) 1744 1745 it := adminClient.Snapshots(ctx, cluster) 1746 for { 1747 s, err := it.Next() 1748 if err == iterator.Done { 1749 break 1750 } 1751 if err != nil { 1752 return nil, err 1753 } 1754 infos = append(infos, s) 1755 } 1756 return infos, err 1757 } 1758 1759 // Delete the table at the end of the test. Schedule ahead of time 1760 // in case the client fails 1761 defer deleteTable(ctx, t, adminClient, table) 1762 1763 if err := adminClient.CreateTable(ctx, table); err != nil { 1764 t.Fatalf("Creating table: %v", err) 1765 } 1766 1767 // Precondition: no snapshots 1768 snapshots, err := list(cluster) 1769 if err != nil { 1770 t.Fatalf("Initial snapshot list: %v", err) 1771 } 1772 if got, want := len(snapshots), 0; got != want { 1773 t.Fatalf("Initial snapshot list len: %d, want: %d", got, want) 1774 } 1775 1776 // Create snapshot 1777 defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot") 1778 1779 if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil { 1780 t.Fatalf("Creating snaphot: %v", err) 1781 } 1782 1783 // List snapshot 1784 snapshots, err = list(cluster) 1785 if err != nil { 1786 t.Fatalf("Listing snapshots: %v", err) 1787 } 1788 if got, want := len(snapshots), 1; got != want { 1789 t.Fatalf("Listing snapshot count: %d, want: %d", got, want) 1790 } 1791 if got, want := snapshots[0].Name, "mysnapshot"; got != want { 1792 t.Fatalf("Snapshot name: %s, want: %s", got, want) 1793 } 1794 if got, want := snapshots[0].SourceTable, table; got != want { 1795 t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want) 1796 } 1797 if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { 1798 t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want) 1799 } 1800 1801 // Get snapshot 1802 snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot") 1803 if err != nil { 1804 t.Fatalf("SnapshotInfo: %v", snapshot) 1805 } 1806 if got, want := *snapshot, *snapshots[0]; got != want { 1807 t.Fatalf("SnapshotInfo: %v, want: %v", got, want) 1808 } 1809 1810 // Restore 1811 restoredTable := table + "-restored" 1812 defer deleteTable(ctx, t, adminClient, restoredTable) 1813 if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil { 1814 t.Fatalf("CreateTableFromSnapshot: %v", err) 1815 } 1816 if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { 1817 t.Fatalf("Restored TableInfo: %v", err) 1818 } 1819 1820 // Delete snapshot 1821 if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil { 1822 t.Fatalf("DeleteSnapshot: %v", err) 1823 } 1824 snapshots, err = list(cluster) 1825 if err != nil { 1826 t.Fatalf("List after Delete: %v", err) 1827 } 1828 if got, want := len(snapshots), 0; got != want { 1829 t.Fatalf("List after delete len: %d, want: %d", got, want) 1830 } 1831} 1832 1833// instanceAdminClientMock is used to test FailedLocations field processing. 1834type instanceAdminClientMock struct { 1835 Clusters []*btapb.Cluster 1836 UnavailableLocations []string 1837} 1838 1839func (iacm *instanceAdminClientMock) ListClusters(ctx context.Context, req *btapb.ListClustersRequest, opts ...grpc.CallOption) (*btapb.ListClustersResponse, error) { 1840 res := btapb.ListClustersResponse{ 1841 Clusters: iacm.Clusters, 1842 FailedLocations: iacm.UnavailableLocations, 1843 } 1844 return &res, nil 1845} 1846 1847func (iacm *instanceAdminClientMock) CreateInstance(ctx context.Context, in *btapb.CreateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) { 1848 return nil, nil 1849} 1850 1851func (iacm *instanceAdminClientMock) GetInstance(ctx context.Context, in *btapb.GetInstanceRequest, opts ...grpc.CallOption) (*btapb.Instance, error) { 1852 return nil, nil 1853} 1854 1855func (iacm *instanceAdminClientMock) ListInstances(ctx context.Context, in *btapb.ListInstancesRequest, opts ...grpc.CallOption) (*btapb.ListInstancesResponse, error) { 1856 return nil, nil 1857} 1858 1859func (iacm *instanceAdminClientMock) UpdateInstance(ctx context.Context, in *btapb.Instance, opts ...grpc.CallOption) (*btapb.Instance, error) { 1860 return nil, nil 1861} 1862 1863func (iacm *instanceAdminClientMock) PartialUpdateInstance(ctx context.Context, in *btapb.PartialUpdateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) { 1864 return nil, nil 1865} 1866 1867func (iacm *instanceAdminClientMock) DeleteInstance(ctx context.Context, in *btapb.DeleteInstanceRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { 1868 return nil, nil 1869} 1870 1871func (iacm *instanceAdminClientMock) CreateCluster(ctx context.Context, in *btapb.CreateClusterRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) { 1872 return nil, nil 1873} 1874 1875func (iacm *instanceAdminClientMock) GetCluster(ctx context.Context, in *btapb.GetClusterRequest, opts ...grpc.CallOption) (*btapb.Cluster, error) { 1876 return nil, nil 1877} 1878 1879func (iacm *instanceAdminClientMock) UpdateCluster(ctx context.Context, in *btapb.Cluster, opts ...grpc.CallOption) (*longrunning.Operation, error) { 1880 return nil, nil 1881} 1882 1883func (iacm *instanceAdminClientMock) DeleteCluster(ctx context.Context, in *btapb.DeleteClusterRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { 1884 return nil, nil 1885} 1886 1887func (iacm *instanceAdminClientMock) CreateAppProfile(ctx context.Context, in *btapb.CreateAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) { 1888 return nil, nil 1889} 1890 1891func (iacm *instanceAdminClientMock) GetAppProfile(ctx context.Context, in *btapb.GetAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) { 1892 return nil, nil 1893} 1894 1895func (iacm *instanceAdminClientMock) ListAppProfiles(ctx context.Context, in *btapb.ListAppProfilesRequest, opts ...grpc.CallOption) (*btapb.ListAppProfilesResponse, error) { 1896 return nil, nil 1897} 1898 1899func (iacm *instanceAdminClientMock) UpdateAppProfile(ctx context.Context, in *btapb.UpdateAppProfileRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) { 1900 return nil, nil 1901} 1902 1903func (iacm *instanceAdminClientMock) DeleteAppProfile(ctx context.Context, in *btapb.DeleteAppProfileRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { 1904 return nil, nil 1905} 1906 1907func (iacm *instanceAdminClientMock) GetIamPolicy(ctx context.Context, in *v1.GetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) { 1908 return nil, nil 1909} 1910 1911func (iacm *instanceAdminClientMock) SetIamPolicy(ctx context.Context, in *v1.SetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) { 1912 return nil, nil 1913} 1914 1915func (iacm *instanceAdminClientMock) TestIamPermissions(ctx context.Context, in *v1.TestIamPermissionsRequest, opts ...grpc.CallOption) (*v1.TestIamPermissionsResponse, error) { 1916 return nil, nil 1917} 1918 1919func TestIntegration_InstanceAdminClient_Clusters_WithFailedLocations(t *testing.T) { 1920 testEnv, err := NewIntegrationEnv() 1921 if err != nil { 1922 t.Fatalf("IntegrationEnv: %v", err) 1923 } 1924 defer testEnv.Close() 1925 1926 if !testEnv.Config().UseProd { 1927 t.Skip("emulator doesn't support snapshots") 1928 } 1929 1930 iAdminClient, err := testEnv.NewInstanceAdminClient() 1931 if err != nil { 1932 t.Fatalf("NewInstanceAdminClient: %v", err) 1933 } 1934 defer iAdminClient.Close() 1935 1936 cluster1 := btapb.Cluster{Name: "cluster1"} 1937 failedLoc := "euro1" 1938 1939 iAdminClient.iClient = &instanceAdminClientMock{ 1940 Clusters: []*btapb.Cluster{&cluster1}, 1941 UnavailableLocations: []string{failedLoc}, 1942 } 1943 1944 cis, err := iAdminClient.Clusters(context.Background(), "instance-id") 1945 convertedErr, ok := err.(ErrPartiallyUnavailable) 1946 if !ok { 1947 t.Fatalf("want error ErrPartiallyUnavailable, got other") 1948 } 1949 if got, want := len(convertedErr.Locations), 1; got != want { 1950 t.Fatalf("want %v failed locations, got %v", want, got) 1951 } 1952 if got, want := convertedErr.Locations[0], failedLoc; got != want { 1953 t.Fatalf("want failed location %v, got %v", want, got) 1954 } 1955 if got, want := len(cis), 1; got != want { 1956 t.Fatalf("want %v failed locations, got %v", want, got) 1957 } 1958 if got, want := cis[0].Name, cluster1.Name; got != want { 1959 t.Fatalf("want cluster %v, got %v", want, got) 1960 } 1961} 1962 1963func TestIntegration_Granularity(t *testing.T) { 1964 testEnv, err := NewIntegrationEnv() 1965 if err != nil { 1966 t.Fatalf("IntegrationEnv: %v", err) 1967 } 1968 defer testEnv.Close() 1969 1970 timeout := 2 * time.Second 1971 if testEnv.Config().UseProd { 1972 timeout = 5 * time.Minute 1973 } 1974 ctx, _ := context.WithTimeout(context.Background(), timeout) 1975 1976 adminClient, err := testEnv.NewAdminClient() 1977 if err != nil { 1978 t.Fatalf("NewAdminClient: %v", err) 1979 } 1980 defer adminClient.Close() 1981 1982 list := func() []string { 1983 tbls, err := adminClient.Tables(ctx) 1984 if err != nil { 1985 t.Fatalf("Fetching list of tables: %v", err) 1986 } 1987 sort.Strings(tbls) 1988 return tbls 1989 } 1990 containsAll := func(got, want []string) bool { 1991 gotSet := make(map[string]bool) 1992 1993 for _, s := range got { 1994 gotSet[s] = true 1995 } 1996 for _, s := range want { 1997 if !gotSet[s] { 1998 return false 1999 } 2000 } 2001 return true 2002 } 2003 2004 defer deleteTable(ctx, t, adminClient, "mytable") 2005 2006 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 2007 t.Fatalf("Creating table: %v", err) 2008 } 2009 2010 tables := list() 2011 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 2012 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 2013 } 2014 2015 // calling ModifyColumnFamilies to check the granularity of table 2016 prefix := adminClient.instancePrefix() 2017 req := &btapb.ModifyColumnFamiliesRequest{ 2018 Name: prefix + "/tables/" + "mytable", 2019 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 2020 Id: "cf", 2021 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}}, 2022 }}, 2023 } 2024 table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req) 2025 if err != nil { 2026 t.Fatalf("Creating column family: %v", err) 2027 } 2028 if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) { 2029 t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS)) 2030 } 2031} 2032 2033func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) { 2034 testEnv, err := NewIntegrationEnv() 2035 if err != nil { 2036 t.Fatalf("IntegrationEnv: %v", err) 2037 } 2038 defer testEnv.Close() 2039 2040 timeout := 2 * time.Second 2041 if testEnv.Config().UseProd { 2042 timeout = 5 * time.Minute 2043 } 2044 ctx, cancel := context.WithTimeout(context.Background(), timeout) 2045 defer cancel() 2046 2047 adminClient, err := testEnv.NewAdminClient() 2048 if err != nil { 2049 t.Fatalf("NewAdminClient: %v", err) 2050 } 2051 defer adminClient.Close() 2052 2053 iAdminClient, err := testEnv.NewInstanceAdminClient() 2054 if err != nil { 2055 t.Fatalf("NewInstanceAdminClient: %v", err) 2056 } 2057 2058 if iAdminClient == nil { 2059 return 2060 } 2061 2062 defer iAdminClient.Close() 2063 profile := ProfileConf{ 2064 ProfileID: "app_profile1", 2065 InstanceID: adminClient.instance, 2066 ClusterID: testEnv.Config().Cluster, 2067 Description: "creating new app profile 1", 2068 RoutingPolicy: SingleClusterRouting, 2069 } 2070 2071 createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile) 2072 if err != nil { 2073 t.Fatalf("Creating app profile: %v", err) 2074 } 2075 2076 gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 2077 if err != nil { 2078 t.Fatalf("Get app profile: %v", err) 2079 } 2080 2081 if !proto.Equal(createdProfile, gotProfile) { 2082 t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name) 2083 } 2084 2085 list := func(instanceID string) ([]*btapb.AppProfile, error) { 2086 profiles := []*btapb.AppProfile(nil) 2087 2088 it := iAdminClient.ListAppProfiles(ctx, instanceID) 2089 for { 2090 s, err := it.Next() 2091 if err == iterator.Done { 2092 break 2093 } 2094 if err != nil { 2095 return nil, err 2096 } 2097 profiles = append(profiles, s) 2098 } 2099 return profiles, err 2100 } 2101 2102 profiles, err := list(adminClient.instance) 2103 if err != nil { 2104 t.Fatalf("List app profile: %v", err) 2105 } 2106 2107 if got, want := len(profiles), 1; got != want { 2108 t.Fatalf("Initial app profile list len: %d, want: %d", got, want) 2109 } 2110 2111 for _, test := range []struct { 2112 desc string 2113 uattrs ProfileAttrsToUpdate 2114 want *btapb.AppProfile // nil means error 2115 }{ 2116 { 2117 desc: "empty update", 2118 uattrs: ProfileAttrsToUpdate{}, 2119 want: nil, 2120 }, 2121 2122 { 2123 desc: "empty description update", 2124 uattrs: ProfileAttrsToUpdate{Description: ""}, 2125 want: &btapb.AppProfile{ 2126 Name: gotProfile.Name, 2127 Description: "", 2128 RoutingPolicy: gotProfile.RoutingPolicy, 2129 Etag: gotProfile.Etag, 2130 }, 2131 }, 2132 { 2133 desc: "routing update", 2134 uattrs: ProfileAttrsToUpdate{ 2135 RoutingPolicy: SingleClusterRouting, 2136 ClusterID: testEnv.Config().Cluster, 2137 }, 2138 want: &btapb.AppProfile{ 2139 Name: gotProfile.Name, 2140 Description: "", 2141 Etag: gotProfile.Etag, 2142 RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{ 2143 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 2144 ClusterId: testEnv.Config().Cluster, 2145 }, 2146 }, 2147 }, 2148 }, 2149 } { 2150 err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs) 2151 if err != nil { 2152 if test.want != nil { 2153 t.Errorf("%s: %v", test.desc, err) 2154 } 2155 continue 2156 } 2157 if err == nil && test.want == nil { 2158 t.Errorf("%s: got nil, want error", test.desc) 2159 continue 2160 } 2161 2162 got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 2163 2164 if !proto.Equal(got, test.want) { 2165 t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want) 2166 } 2167 2168 } 2169 2170 err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1") 2171 if err != nil { 2172 t.Fatalf("Delete app profile: %v", err) 2173 } 2174} 2175 2176func TestIntegration_InstanceUpdate(t *testing.T) { 2177 testEnv, err := NewIntegrationEnv() 2178 if err != nil { 2179 t.Fatalf("IntegrationEnv: %v", err) 2180 } 2181 defer testEnv.Close() 2182 2183 timeout := 2 * time.Second 2184 if testEnv.Config().UseProd { 2185 timeout = 5 * time.Minute 2186 } 2187 ctx, cancel := context.WithTimeout(context.Background(), timeout) 2188 defer cancel() 2189 2190 adminClient, err := testEnv.NewAdminClient() 2191 if err != nil { 2192 t.Fatalf("NewAdminClient: %v", err) 2193 } 2194 2195 defer adminClient.Close() 2196 2197 iAdminClient, err := testEnv.NewInstanceAdminClient() 2198 if err != nil { 2199 t.Fatalf("NewInstanceAdminClient: %v", err) 2200 } 2201 2202 if iAdminClient == nil { 2203 return 2204 } 2205 2206 defer iAdminClient.Close() 2207 2208 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 2209 if err != nil { 2210 t.Errorf("InstanceInfo: %v", err) 2211 } 2212 2213 if iInfo.Name != adminClient.instance { 2214 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 2215 } 2216 2217 if iInfo.DisplayName != adminClient.instance { 2218 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 2219 } 2220 2221 const numNodes = 4 2222 // update cluster nodes 2223 if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil { 2224 t.Errorf("UpdateCluster: %v", err) 2225 } 2226 2227 // get cluster after updating 2228 cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster) 2229 if err != nil { 2230 t.Errorf("GetCluster %v", err) 2231 } 2232 if cis.ServeNodes != int(numNodes) { 2233 t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes)) 2234 } 2235} 2236 2237func TestIntegration_AdminBackup(t *testing.T) { 2238 testEnv, err := NewIntegrationEnv() 2239 if err != nil { 2240 t.Fatalf("IntegrationEnv: %v", err) 2241 } 2242 defer testEnv.Close() 2243 2244 if !testEnv.Config().UseProd { 2245 t.Skip("emulator doesn't support backups") 2246 } 2247 2248 timeout := 5 * time.Minute 2249 ctx, _ := context.WithTimeout(context.Background(), timeout) 2250 2251 adminClient, err := testEnv.NewAdminClient() 2252 if err != nil { 2253 t.Fatalf("NewAdminClient: %v", err) 2254 } 2255 defer adminClient.Close() 2256 2257 table := testEnv.Config().Table 2258 cluster := testEnv.Config().Cluster 2259 2260 // Delete the table at the end of the test. Schedule ahead of time 2261 // in case the client fails 2262 defer deleteTable(ctx, t, adminClient, table) 2263 2264 list := func(cluster string) ([]*BackupInfo, error) { 2265 infos := []*BackupInfo(nil) 2266 2267 it := adminClient.Backups(ctx, cluster) 2268 for { 2269 s, err := it.Next() 2270 if err == iterator.Done { 2271 break 2272 } 2273 if err != nil { 2274 return nil, err 2275 } 2276 infos = append(infos, s) 2277 } 2278 return infos, err 2279 } 2280 2281 if err := adminClient.CreateTable(ctx, table); err != nil { 2282 t.Fatalf("Creating table: %v", err) 2283 } 2284 2285 // Precondition: no backups 2286 backups, err := list(cluster) 2287 if err != nil { 2288 t.Fatalf("Initial backup list: %v", err) 2289 } 2290 if got, want := len(backups), 0; got != want { 2291 t.Fatalf("Initial backup list len: %d, want: %d", got, want) 2292 } 2293 2294 // Create backup 2295 backupName := "mybackup" 2296 defer adminClient.DeleteBackup(ctx, cluster, "mybackup") 2297 2298 if err = adminClient.CreateBackup(ctx, table, cluster, backupName, time.Now().Add(8*time.Hour)); err != nil { 2299 t.Fatalf("Creating backup: %v", err) 2300 } 2301 2302 // List backup 2303 backups, err = list(cluster) 2304 if err != nil { 2305 t.Fatalf("Listing backups: %v", err) 2306 } 2307 if got, want := len(backups), 1; got != want { 2308 t.Fatalf("Listing backup count: %d, want: %d", got, want) 2309 } 2310 if got, want := backups[0].Name, backupName; got != want { 2311 t.Fatalf("Backup name: %s, want: %s", got, want) 2312 } 2313 if got, want := backups[0].SourceTable, table; got != want { 2314 t.Fatalf("Backup SourceTable: %s, want: %s", got, want) 2315 } 2316 if got, want := backups[0].ExpireTime, backups[0].StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { 2317 t.Fatalf("Backup ExpireTime: %s, want: %s", got, want) 2318 } 2319 2320 // Get backup 2321 backup, err := adminClient.BackupInfo(ctx, cluster, backupName) 2322 if err != nil { 2323 t.Fatalf("BackupInfo: %v", backup) 2324 } 2325 if got, want := *backup, *backups[0]; got != want { 2326 t.Fatalf("BackupInfo: %v, want: %v", got, want) 2327 } 2328 2329 // Update backup 2330 newExpireTime := time.Now().Add(10 * time.Hour) 2331 err = adminClient.UpdateBackup(ctx, cluster, backupName, newExpireTime) 2332 if err != nil { 2333 t.Fatalf("UpdateBackup failed: %v", err) 2334 } 2335 2336 // Check that updated backup has the correct expire time 2337 updatedBackup, err := adminClient.BackupInfo(ctx, cluster, backupName) 2338 if err != nil { 2339 t.Fatalf("BackupInfo: %v", err) 2340 } 2341 backup.ExpireTime = newExpireTime 2342 // Server clock and local clock may not be perfectly sync'ed. 2343 if got, want := *updatedBackup, *backup; got.ExpireTime.Sub(want.ExpireTime) > time.Minute { 2344 t.Fatalf("BackupInfo: %v, want: %v", got, want) 2345 } 2346 2347 // Restore backup 2348 restoredTable := table + "-restored" 2349 defer deleteTable(ctx, t, adminClient, restoredTable) 2350 if err = adminClient.RestoreTable(ctx, restoredTable, cluster, backupName); err != nil { 2351 t.Fatalf("RestoreTable: %v", err) 2352 } 2353 if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { 2354 t.Fatalf("Restored TableInfo: %v", err) 2355 } 2356 2357 // Delete backup 2358 if err = adminClient.DeleteBackup(ctx, cluster, backupName); err != nil { 2359 t.Fatalf("DeleteBackup: %v", err) 2360 } 2361 backups, err = list(cluster) 2362 if err != nil { 2363 t.Fatalf("List after Delete: %v", err) 2364 } 2365 if got, want := len(backups), 0; got != want { 2366 t.Fatalf("List after delete len: %d, want: %d", got, want) 2367 } 2368} 2369 2370// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed. 2371func TestIntegration_DirectPathFallback(t *testing.T) { 2372 ctx := context.Background() 2373 testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 2374 if err != nil { 2375 t.Fatal(err) 2376 } 2377 defer cleanup() 2378 2379 if !testEnv.Config().AttemptDirectPath { 2380 t.Skip() 2381 } 2382 2383 if len(blackholeDpv6Cmd) == 0 { 2384 t.Fatal("-it.blackhole-dpv6-cmd unset") 2385 } 2386 if len(blackholeDpv4Cmd) == 0 { 2387 t.Fatal("-it.blackhole-dpv4-cmd unset") 2388 } 2389 if len(allowDpv6Cmd) == 0 { 2390 t.Fatal("-it.allowdpv6-cmd unset") 2391 } 2392 if len(allowDpv4Cmd) == 0 { 2393 t.Fatal("-it.allowdpv4-cmd unset") 2394 } 2395 2396 if err := populatePresidentsGraph(table); err != nil { 2397 t.Fatal(err) 2398 } 2399 2400 // Precondition: wait for DirectPath to connect. 2401 dpEnabled := examineTraffic(ctx, testEnv, table, false) 2402 if !dpEnabled { 2403 t.Fatalf("Failed to observe RPCs over DirectPath") 2404 } 2405 2406 // Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. 2407 blackholeDirectPath(testEnv, t) 2408 dpDisabled := examineTraffic(ctx, testEnv, table, true) 2409 if !dpDisabled { 2410 t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") 2411 } 2412 2413 // Disable the blackhole, and client should use DirectPath again. 2414 allowDirectPath(testEnv, t) 2415 dpEnabled = examineTraffic(ctx, testEnv, table, false) 2416 if !dpEnabled { 2417 t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") 2418 } 2419} 2420 2421// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true). 2422func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool { 2423 numCount := 0 2424 const ( 2425 numRPCsToSend = 20 2426 minCompleteRPC = 40 2427 ) 2428 2429 start := time.Now() 2430 for time.Since(start) < 2*time.Minute { 2431 for i := 0; i < numRPCsToSend; i++ { 2432 _, _ = table.ReadRow(ctx, "j§adams") 2433 if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP { 2434 numCount++ 2435 if numCount >= minCompleteRPC { 2436 return true 2437 } 2438 } 2439 time.Sleep(100 * time.Millisecond) 2440 } 2441 } 2442 return false 2443} 2444 2445func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { 2446 testEnv, err := NewIntegrationEnv() 2447 if err != nil { 2448 return nil, nil, nil, nil, "", nil, err 2449 } 2450 2451 var timeout time.Duration 2452 if testEnv.Config().UseProd { 2453 timeout = 10 * time.Minute 2454 t.Logf("Running test against production") 2455 } else { 2456 timeout = 5 * time.Minute 2457 t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint) 2458 } 2459 ctx, cancel := context.WithTimeout(ctx, timeout) 2460 2461 client, err := testEnv.NewClient() 2462 if err != nil { 2463 return nil, nil, nil, nil, "", nil, err 2464 } 2465 2466 adminClient, err := testEnv.NewAdminClient() 2467 if err != nil { 2468 return nil, nil, nil, nil, "", nil, err 2469 } 2470 2471 if testEnv.Config().UseProd { 2472 // TODO: tables may not be successfully deleted in some cases, and will 2473 // become obsolete. We may need a way to automatically delete them. 2474 tableName = tableNameSpace.New() 2475 } else { 2476 tableName = testEnv.Config().Table 2477 } 2478 2479 if err := adminClient.CreateTable(ctx, tableName); err != nil { 2480 cancel() 2481 return nil, nil, nil, nil, "", nil, err 2482 } 2483 if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil { 2484 cancel() 2485 return nil, nil, nil, nil, "", nil, err 2486 } 2487 2488 return testEnv, client, adminClient, client.Open(tableName), tableName, func() { 2489 if err := adminClient.DeleteTable(ctx, tableName); err != nil { 2490 t.Errorf("DeleteTable got error %v", err) 2491 } 2492 cancel() 2493 client.Close() 2494 adminClient.Close() 2495 }, nil 2496} 2497 2498func formatReadItem(ri ReadItem) string { 2499 // Use the column qualifier only to make the test data briefer. 2500 col := ri.Column[strings.Index(ri.Column, ":")+1:] 2501 return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value) 2502} 2503 2504func fill(b, sub []byte) { 2505 for len(b) > len(sub) { 2506 n := copy(b, sub) 2507 b = b[n:] 2508 } 2509} 2510 2511func clearTimestamps(r Row) { 2512 for _, ris := range r { 2513 for i := range ris { 2514 ris[i].Timestamp = 0 2515 } 2516 } 2517} 2518 2519func deleteTable(ctx context.Context, t *testing.T, ac *AdminClient, name string) { 2520 bo := gax.Backoff{ 2521 Initial: 100 * time.Millisecond, 2522 Max: 2 * time.Second, 2523 Multiplier: 1.2, 2524 } 2525 ctx, _ = context.WithTimeout(ctx, time.Second*30) 2526 2527 err := internal.Retry(ctx, bo, func() (bool, error) { 2528 err := ac.DeleteTable(ctx, name) 2529 if err != nil { 2530 return false, err 2531 } 2532 return true, nil 2533 }) 2534 if err != nil { 2535 t.Logf("DeleteTable: %v", err) 2536 } 2537} 2538 2539func verifyDirectPathRemoteAddress(testEnv IntegrationEnv, t *testing.T) { 2540 t.Helper() 2541 if !testEnv.Config().AttemptDirectPath { 2542 return 2543 } 2544 if remoteIP, res := isDirectPathRemoteAddress(testEnv); !res { 2545 if testEnv.Config().DirectPathIPV4Only { 2546 t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP) 2547 } else { 2548 t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP) 2549 } 2550 } 2551} 2552 2553func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { 2554 remoteIP := testEnv.Peer().Addr.String() 2555 // DirectPath ipv4-only can only use ipv4 traffic. 2556 if testEnv.Config().DirectPathIPV4Only { 2557 return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) 2558 } 2559 // DirectPath ipv6 can use either ipv4 or ipv6 traffic. 2560 return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) 2561} 2562 2563func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) { 2564 cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) 2565 out, _ := cmdRes.CombinedOutput() 2566 t.Logf(string(out)) 2567 if testEnv.Config().DirectPathIPV4Only { 2568 return 2569 } 2570 cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) 2571 out, _ = cmdRes.CombinedOutput() 2572 t.Logf(string(out)) 2573} 2574 2575func allowDirectPath(testEnv IntegrationEnv, t *testing.T) { 2576 cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) 2577 out, _ := cmdRes.CombinedOutput() 2578 t.Logf(string(out)) 2579 if testEnv.Config().DirectPathIPV4Only { 2580 return 2581 } 2582 cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) 2583 out, _ = cmdRes.CombinedOutput() 2584 t.Logf(string(out)) 2585} 2586