1/* 2Copyright 2019 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "context" 21 "flag" 22 "fmt" 23 "math" 24 "math/rand" 25 "os/exec" 26 "reflect" 27 "sort" 28 "strings" 29 "sync" 30 "testing" 31 "time" 32 33 "cloud.google.com/go/iam" 34 "cloud.google.com/go/internal" 35 "cloud.google.com/go/internal/testutil" 36 "cloud.google.com/go/internal/uid" 37 "github.com/golang/protobuf/proto" 38 "github.com/google/go-cmp/cmp" 39 gax "github.com/googleapis/gax-go/v2" 40 "google.golang.org/api/iterator" 41 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 42 v1 "google.golang.org/genproto/googleapis/iam/v1" 43 longrunning "google.golang.org/genproto/googleapis/longrunning" 44 grpc "google.golang.org/grpc" 45 "google.golang.org/protobuf/types/known/emptypb" 46) 47 48const ( 49 directPathIPV6Prefix = "[2001:4860:8040" 50 directPathIPV4Prefix = "34.126" 51) 52 53var ( 54 presidentsSocialGraph = map[string][]string{ 55 "wmckinley": {"tjefferson"}, 56 "gwashington": {"jadams"}, 57 "tjefferson": {"gwashington", "jadams"}, 58 "jadams": {"gwashington", "tjefferson"}, 59 } 60 61 tableNameSpace = uid.NewSpace("cbt-test", &uid.Options{Short: true}) 62) 63 64func populatePresidentsGraph(table *Table) error { 65 ctx := context.Background() 66 for row, ss := range presidentsSocialGraph { 67 mut := NewMutation() 68 for _, name := range ss { 69 mut.Set("follows", name, 1000, []byte("1")) 70 } 71 if err := table.Apply(ctx, row, mut); err != nil { 72 return fmt.Errorf("Mutating row %q: %v", row, err) 73 } 74 } 75 return nil 76} 77 78var instanceToCreate string 79var instanceToCreateZone string 80var instanceToCreateZone2 string 81var blackholeDpv6Cmd string 82var blackholeDpv4Cmd string 83var allowDpv6Cmd string 84var allowDpv4Cmd string 85 86func init() { 87 // Don't test instance creation by default, as quota is necessary and aborted tests could strand resources. 88 flag.StringVar(&instanceToCreate, "it.instance-to-create", "", 89 "The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.") 90 flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b", 91 "The zone in which to create the new test instance.") 92 flag.StringVar(&instanceToCreateZone2, "it.instance-to-create-zone2", "us-east1-c", 93 "The zone in which to create a second cluster in the test instance.") 94 // Use sysctl or iptables to blackhole DirectPath IP for fallback tests. 95 flag.StringVar(&blackholeDpv6Cmd, "it.blackhole-dpv6-cmd", "", "Command to make LB and backend addresses blackholed over dpv6") 96 flag.StringVar(&blackholeDpv4Cmd, "it.blackhole-dpv4-cmd", "", "Command to make LB and backend addresses blackholed over dpv4") 97 flag.StringVar(&allowDpv6Cmd, "it.allow-dpv6-cmd", "", "Command to make LB and backend addresses allowed over dpv6") 98 flag.StringVar(&allowDpv4Cmd, "it.allow-dpv4-cmd", "", "Command to make LB and backend addresses allowed over dpv4") 99} 100 101func TestIntegration_ConditionalMutations(t *testing.T) { 102 ctx := context.Background() 103 testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 104 if err != nil { 105 t.Fatal(err) 106 } 107 defer cleanup() 108 109 if err := populatePresidentsGraph(table); err != nil { 110 t.Fatal(err) 111 } 112 113 // Do a conditional mutation with a complex filter. 114 mutTrue := NewMutation() 115 mutTrue.Set("follows", "wmckinley", 1000, []byte("1")) 116 filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter(".")) 117 mut := NewCondMutation(filter, mutTrue, nil) 118 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 119 t.Fatalf("Conditionally mutating row: %v", err) 120 } 121 verifyDirectPathRemoteAddress(testEnv, t) 122 // Do a second condition mutation with a filter that does not match, 123 // and thus no changes should be made. 124 mutTrue = NewMutation() 125 mutTrue.DeleteRow() 126 filter = ColumnFilter("snoop.dogg") 127 mut = NewCondMutation(filter, mutTrue, nil) 128 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 129 t.Fatalf("Conditionally mutating row: %v", err) 130 } 131 verifyDirectPathRemoteAddress(testEnv, t) 132 133 // Fetch a row. 134 row, err := table.ReadRow(ctx, "jadams") 135 if err != nil { 136 t.Fatalf("Reading a row: %v", err) 137 } 138 verifyDirectPathRemoteAddress(testEnv, t) 139 wantRow := Row{ 140 "follows": []ReadItem{ 141 {Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")}, 142 {Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")}, 143 }, 144 } 145 if !testutil.Equal(row, wantRow) { 146 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 147 } 148} 149 150func TestIntegration_PartialReadRows(t *testing.T) { 151 ctx := context.Background() 152 _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 153 if err != nil { 154 t.Fatal(err) 155 } 156 defer cleanup() 157 158 if err := populatePresidentsGraph(table); err != nil { 159 t.Fatal(err) 160 } 161 162 // Do a scan and stop part way through. 163 // Verify that the ReadRows callback doesn't keep running. 164 stopped := false 165 err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { 166 if r.Key() < "h" { 167 return true 168 } 169 if !stopped { 170 stopped = true 171 return false 172 } 173 t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key()) 174 return false 175 }) 176 if err != nil { 177 t.Fatalf("Partial ReadRows: %v", err) 178 } 179} 180 181func TestIntegration_ReadRowList(t *testing.T) { 182 ctx := context.Background() 183 _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 184 if err != nil { 185 t.Fatal(err) 186 } 187 defer cleanup() 188 189 if err := populatePresidentsGraph(table); err != nil { 190 t.Fatal(err) 191 } 192 193 // Read a RowList 194 var elt []string 195 keys := RowList{"wmckinley", "gwashington", "jadams"} 196 want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1" 197 err = table.ReadRows(ctx, keys, func(r Row) bool { 198 for _, ris := range r { 199 for _, ri := range ris { 200 elt = append(elt, formatReadItem(ri)) 201 } 202 } 203 return true 204 }) 205 if err != nil { 206 t.Fatalf("read RowList: %v", err) 207 } 208 209 if got := strings.Join(elt, ","); got != want { 210 t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) 211 } 212} 213 214func TestIntegration_DeleteRow(t *testing.T) { 215 ctx := context.Background() 216 _, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 217 if err != nil { 218 t.Fatal(err) 219 } 220 defer cleanup() 221 222 if err := populatePresidentsGraph(table); err != nil { 223 t.Fatal(err) 224 } 225 226 // Delete a row and check it goes away. 227 mut := NewMutation() 228 mut.DeleteRow() 229 if err := table.Apply(ctx, "wmckinley", mut); err != nil { 230 t.Fatalf("Apply DeleteRow: %v", err) 231 } 232 row, err := table.ReadRow(ctx, "wmckinley") 233 if err != nil { 234 t.Fatalf("Reading a row after DeleteRow: %v", err) 235 } 236 if len(row) != 0 { 237 t.Fatalf("Read non-zero row after DeleteRow: %v", row) 238 } 239} 240 241func TestIntegration_ReadModifyWrite(t *testing.T) { 242 ctx := context.Background() 243 testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 244 if err != nil { 245 t.Fatal(err) 246 } 247 defer cleanup() 248 249 if err := populatePresidentsGraph(table); err != nil { 250 t.Fatal(err) 251 } 252 253 if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil { 254 t.Fatalf("Creating column family: %v", err) 255 } 256 257 appendRMW := func(b []byte) *ReadModifyWrite { 258 rmw := NewReadModifyWrite() 259 rmw.AppendValue("counter", "likes", b) 260 return rmw 261 } 262 incRMW := func(n int64) *ReadModifyWrite { 263 rmw := NewReadModifyWrite() 264 rmw.Increment("counter", "likes", n) 265 return rmw 266 } 267 rmwSeq := []struct { 268 desc string 269 rmw *ReadModifyWrite 270 want []byte 271 }{ 272 { 273 desc: "append #1", 274 rmw: appendRMW([]byte{0, 0, 0}), 275 want: []byte{0, 0, 0}, 276 }, 277 { 278 desc: "append #2", 279 rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17 280 want: []byte{0, 0, 0, 0, 0, 0, 0, 17}, 281 }, 282 { 283 desc: "increment", 284 rmw: incRMW(8), 285 want: []byte{0, 0, 0, 0, 0, 0, 0, 25}, 286 }, 287 } 288 for _, step := range rmwSeq { 289 row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw) 290 if err != nil { 291 t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) 292 } 293 verifyDirectPathRemoteAddress(testEnv, t) 294 // Make sure the modified cell returned by the RMW operation has a timestamp. 295 if row["counter"][0].Timestamp == 0 { 296 t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp) 297 } 298 clearTimestamps(row) 299 wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}} 300 if !testutil.Equal(row, wantRow) { 301 t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow) 302 } 303 } 304 305 // Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator. 306 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0})) 307 if err != nil { 308 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 309 } 310 verifyDirectPathRemoteAddress(testEnv, t) 311 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0})) 312 if err != nil { 313 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 314 } 315 verifyDirectPathRemoteAddress(testEnv, t) 316 // Get only the correct row back on read. 317 r, err := table.ReadRow(ctx, "issue-723-1") 318 if err != nil { 319 t.Fatalf("Reading row: %v", err) 320 } 321 verifyDirectPathRemoteAddress(testEnv, t) 322 if r.Key() != "issue-723-1" { 323 t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1") 324 } 325} 326 327func TestIntegration_ArbitraryTimestamps(t *testing.T) { 328 ctx := context.Background() 329 _, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 330 if err != nil { 331 t.Fatal(err) 332 } 333 defer cleanup() 334 335 // Test arbitrary timestamps more thoroughly. 336 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 337 t.Fatalf("Creating column family: %v", err) 338 } 339 const numVersions = 4 340 mut := NewMutation() 341 for i := 1; i < numVersions; i++ { 342 // Timestamps are used in thousands because the server 343 // only permits that granularity. 344 mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 345 mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 346 } 347 if err := table.Apply(ctx, "testrow", mut); err != nil { 348 t.Fatalf("Mutating row: %v", err) 349 } 350 r, err := table.ReadRow(ctx, "testrow") 351 if err != nil { 352 t.Fatalf("Reading row: %v", err) 353 } 354 wantRow := Row{"ts": []ReadItem{ 355 // These should be returned in descending timestamp order. 356 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 357 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 358 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 359 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 360 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 361 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 362 }} 363 if !testutil.Equal(r, wantRow) { 364 t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow) 365 } 366 367 // Do the same read, but filter to the latest two versions. 368 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 369 if err != nil { 370 t.Fatalf("Reading row: %v", err) 371 } 372 wantRow = Row{"ts": []ReadItem{ 373 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 374 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 375 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 376 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 377 }} 378 if !testutil.Equal(r, wantRow) { 379 t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow) 380 } 381 // Check cell offset / limit 382 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3))) 383 if err != nil { 384 t.Fatalf("Reading row: %v", err) 385 } 386 wantRow = Row{"ts": []ReadItem{ 387 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 388 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 389 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 390 }} 391 if !testutil.Equal(r, wantRow) { 392 t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow) 393 } 394 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3))) 395 if err != nil { 396 t.Fatalf("Reading row: %v", err) 397 } 398 wantRow = Row{"ts": []ReadItem{ 399 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 400 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 401 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 402 }} 403 if !testutil.Equal(r, wantRow) { 404 t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow) 405 } 406 // Check timestamp range filtering (with truncation) 407 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000))) 408 if err != nil { 409 t.Fatalf("Reading row: %v", err) 410 } 411 wantRow = Row{"ts": []ReadItem{ 412 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 413 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 414 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 415 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 416 }} 417 if !testutil.Equal(r, wantRow) { 418 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow) 419 } 420 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0))) 421 if err != nil { 422 t.Fatalf("Reading row: %v", err) 423 } 424 wantRow = Row{"ts": []ReadItem{ 425 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 426 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 427 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 428 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 429 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 430 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 431 }} 432 if !testutil.Equal(r, wantRow) { 433 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow) 434 } 435 // Delete non-existing cells, no such column family in this row 436 // Should not delete anything 437 if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil { 438 t.Fatalf("Creating column family: %v", err) 439 } 440 mut = NewMutation() 441 mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval 442 if err := table.Apply(ctx, "testrow", mut); err != nil { 443 t.Fatalf("Mutating row: %v", err) 444 } 445 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 446 if err != nil { 447 t.Fatalf("Reading row: %v", err) 448 } 449 if !testutil.Equal(r, wantRow) { 450 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 451 } 452 // Delete non-existing cells, no such column in this column family 453 // Should not delete anything 454 mut = NewMutation() 455 mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval 456 if err := table.Apply(ctx, "testrow", mut); err != nil { 457 t.Fatalf("Mutating row: %v", err) 458 } 459 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 460 if err != nil { 461 t.Fatalf("Reading row: %v", err) 462 } 463 if !testutil.Equal(r, wantRow) { 464 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 465 } 466 // Delete the cell with timestamp 2000 and repeat the last read, 467 // checking that we get ts 3000 and ts 1000. 468 mut = NewMutation() 469 mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval 470 if err := table.Apply(ctx, "testrow", mut); err != nil { 471 t.Fatalf("Mutating row: %v", err) 472 } 473 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 474 if err != nil { 475 t.Fatalf("Reading row: %v", err) 476 } 477 wantRow = Row{"ts": []ReadItem{ 478 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 479 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 480 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 481 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 482 }} 483 if !testutil.Equal(r, wantRow) { 484 t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow) 485 } 486 487 // Check DeleteCellsInFamily 488 if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil { 489 t.Fatalf("Creating column family: %v", err) 490 } 491 492 mut = NewMutation() 493 mut.Set("status", "start", 2000, []byte("2")) 494 mut.Set("status", "end", 3000, []byte("3")) 495 mut.Set("ts", "col", 1000, []byte("1")) 496 if err := table.Apply(ctx, "row1", mut); err != nil { 497 t.Fatalf("Mutating row: %v", err) 498 } 499 if err := table.Apply(ctx, "row2", mut); err != nil { 500 t.Fatalf("Mutating row: %v", err) 501 } 502 503 mut = NewMutation() 504 mut.DeleteCellsInFamily("status") 505 if err := table.Apply(ctx, "row1", mut); err != nil { 506 t.Fatalf("Delete cf: %v", err) 507 } 508 509 // ColumnFamily removed 510 r, err = table.ReadRow(ctx, "row1") 511 if err != nil { 512 t.Fatalf("Reading row: %v", err) 513 } 514 wantRow = Row{"ts": []ReadItem{ 515 {Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 516 }} 517 if !testutil.Equal(r, wantRow) { 518 t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow) 519 } 520 521 // ColumnFamily not removed 522 r, err = table.ReadRow(ctx, "row2") 523 if err != nil { 524 t.Fatalf("Reading row: %v", err) 525 } 526 wantRow = Row{ 527 "ts": []ReadItem{ 528 {Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 529 }, 530 "status": []ReadItem{ 531 {Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")}, 532 {Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 533 }, 534 } 535 if !testutil.Equal(r, wantRow) { 536 t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow) 537 } 538 539 // Check DeleteCellsInColumn 540 mut = NewMutation() 541 mut.Set("status", "start", 2000, []byte("2")) 542 mut.Set("status", "middle", 3000, []byte("3")) 543 mut.Set("status", "end", 1000, []byte("1")) 544 if err := table.Apply(ctx, "row3", mut); err != nil { 545 t.Fatalf("Mutating row: %v", err) 546 } 547 mut = NewMutation() 548 mut.DeleteCellsInColumn("status", "middle") 549 if err := table.Apply(ctx, "row3", mut); err != nil { 550 t.Fatalf("Delete column: %v", err) 551 } 552 r, err = table.ReadRow(ctx, "row3") 553 if err != nil { 554 t.Fatalf("Reading row: %v", err) 555 } 556 wantRow = Row{ 557 "status": []ReadItem{ 558 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 559 {Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 560 }, 561 } 562 if !testutil.Equal(r, wantRow) { 563 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 564 } 565 mut = NewMutation() 566 mut.DeleteCellsInColumn("status", "start") 567 if err := table.Apply(ctx, "row3", mut); err != nil { 568 t.Fatalf("Delete column: %v", err) 569 } 570 r, err = table.ReadRow(ctx, "row3") 571 if err != nil { 572 t.Fatalf("Reading row: %v", err) 573 } 574 wantRow = Row{ 575 "status": []ReadItem{ 576 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 577 }, 578 } 579 if !testutil.Equal(r, wantRow) { 580 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 581 } 582 mut = NewMutation() 583 mut.DeleteCellsInColumn("status", "end") 584 if err := table.Apply(ctx, "row3", mut); err != nil { 585 t.Fatalf("Delete column: %v", err) 586 } 587 r, err = table.ReadRow(ctx, "row3") 588 if err != nil { 589 t.Fatalf("Reading row: %v", err) 590 } 591 if len(r) != 0 { 592 t.Fatalf("Delete column: got %v, want empty row", r) 593 } 594 // Add same cell after delete 595 mut = NewMutation() 596 mut.Set("status", "end", 1000, []byte("1")) 597 if err := table.Apply(ctx, "row3", mut); err != nil { 598 t.Fatalf("Mutating row: %v", err) 599 } 600 r, err = table.ReadRow(ctx, "row3") 601 if err != nil { 602 t.Fatalf("Reading row: %v", err) 603 } 604 if !testutil.Equal(r, wantRow) { 605 t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow) 606 } 607} 608 609func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { 610 ctx := context.Background() 611 _, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 612 if err != nil { 613 t.Fatal(err) 614 } 615 defer cleanup() 616 617 if err := populatePresidentsGraph(table); err != nil { 618 t.Fatal(err) 619 } 620 621 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 622 t.Fatalf("Creating column family: %v", err) 623 } 624 625 // Do highly concurrent reads/writes. 626 const maxConcurrency = 1000 627 var wg sync.WaitGroup 628 for i := 0; i < maxConcurrency; i++ { 629 wg.Add(1) 630 go func() { 631 defer wg.Done() 632 switch r := rand.Intn(100); { // r ∈ [0,100) 633 case 0 <= r && r < 30: 634 // Do a read. 635 _, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1))) 636 if err != nil { 637 t.Errorf("Concurrent read: %v", err) 638 } 639 case 30 <= r && r < 100: 640 // Do a write. 641 mut := NewMutation() 642 mut.Set("ts", "col", 1000, []byte("data")) 643 if err := table.Apply(ctx, "testrow", mut); err != nil { 644 t.Errorf("Concurrent write: %v", err) 645 } 646 } 647 }() 648 } 649 wg.Wait() 650} 651 652func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { 653 ctx := context.Background() 654 testEnv, _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 655 if err != nil { 656 t.Fatal(err) 657 } 658 defer cleanup() 659 660 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 661 t.Fatalf("Creating column family: %v", err) 662 } 663 664 bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set. 665 nonsense := []byte("lorem ipsum dolor sit amet, ") 666 fill(bigBytes, nonsense) 667 mut := NewMutation() 668 mut.Set("ts", "col", 1000, bigBytes) 669 if err := table.Apply(ctx, "bigrow", mut); err != nil { 670 t.Fatalf("Big write: %v", err) 671 } 672 verifyDirectPathRemoteAddress(testEnv, t) 673 r, err := table.ReadRow(ctx, "bigrow") 674 if err != nil { 675 t.Fatalf("Big read: %v", err) 676 } 677 verifyDirectPathRemoteAddress(testEnv, t) 678 wantRow := Row{"ts": []ReadItem{ 679 {Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes}, 680 }} 681 if !testutil.Equal(r, wantRow) { 682 t.Fatalf("Big read returned incorrect bytes: %v", r) 683 } 684 685 var wg sync.WaitGroup 686 // Now write 1000 rows, each with 82 KB values, then scan them all. 687 medBytes := make([]byte, 82<<10) 688 fill(medBytes, nonsense) 689 sem := make(chan int, 50) // do up to 50 mutations at a time. 690 for i := 0; i < 1000; i++ { 691 mut := NewMutation() 692 mut.Set("ts", "big-scan", 1000, medBytes) 693 row := fmt.Sprintf("row-%d", i) 694 wg.Add(1) 695 go func() { 696 defer wg.Done() 697 defer func() { <-sem }() 698 sem <- 1 699 if err := table.Apply(ctx, row, mut); err != nil { 700 t.Errorf("Preparing large scan: %v", err) 701 } 702 verifyDirectPathRemoteAddress(testEnv, t) 703 }() 704 } 705 wg.Wait() 706 n := 0 707 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 708 for _, ris := range r { 709 for _, ri := range ris { 710 n += len(ri.Value) 711 } 712 } 713 return true 714 }, RowFilter(ColumnFilter("big-scan"))) 715 if err != nil { 716 t.Fatalf("Doing large scan: %v", err) 717 } 718 verifyDirectPathRemoteAddress(testEnv, t) 719 if want := 1000 * len(medBytes); n != want { 720 t.Fatalf("Large scan returned %d bytes, want %d", n, want) 721 } 722 // Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption. 723 rc := 0 724 wantRc := 3 725 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 726 rc++ 727 return true 728 }, LimitRows(int64(wantRc))) 729 if err != nil { 730 t.Fatal(err) 731 } 732 verifyDirectPathRemoteAddress(testEnv, t) 733 if rc != wantRc { 734 t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) 735 } 736 737 // Test bulk mutations 738 if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil { 739 t.Fatalf("Creating column family: %v", err) 740 } 741 bulkData := map[string][]string{ 742 "red sox": {"2004", "2007", "2013"}, 743 "patriots": {"2001", "2003", "2004", "2014"}, 744 "celtics": {"1981", "1984", "1986", "2008"}, 745 } 746 var rowKeys []string 747 var muts []*Mutation 748 for row, ss := range bulkData { 749 mut := NewMutation() 750 for _, name := range ss { 751 mut.Set("bulk", name, 1000, []byte("1")) 752 } 753 rowKeys = append(rowKeys, row) 754 muts = append(muts, mut) 755 } 756 status, err := table.ApplyBulk(ctx, rowKeys, muts) 757 if err != nil { 758 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 759 } 760 verifyDirectPathRemoteAddress(testEnv, t) 761 if status != nil { 762 t.Fatalf("non-nil errors: %v", err) 763 } 764 765 // Read each row back 766 for rowKey, ss := range bulkData { 767 row, err := table.ReadRow(ctx, rowKey) 768 if err != nil { 769 t.Fatalf("Reading a bulk row: %v", err) 770 } 771 verifyDirectPathRemoteAddress(testEnv, t) 772 var wantItems []ReadItem 773 for _, val := range ss { 774 wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")}) 775 } 776 wantRow := Row{"bulk": wantItems} 777 if !testutil.Equal(row, wantRow) { 778 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 779 } 780 } 781 782 // Test bulk write errors. 783 // Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error. 784 badMut := NewMutation() 785 badMut.Set("badfamily", "col", ServerTime, nil) 786 badMut2 := NewMutation() 787 badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1")) 788 status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2}) 789 if err != nil { 790 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 791 } 792 verifyDirectPathRemoteAddress(testEnv, t) 793 if status == nil { 794 t.Fatalf("No errors for bad bulk mutation") 795 } else if status[0] == nil || status[1] == nil { 796 t.Fatalf("No error for bad bulk mutation") 797 } 798} 799 800func TestIntegration_Read(t *testing.T) { 801 ctx := context.Background() 802 testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 803 if err != nil { 804 t.Fatal(err) 805 } 806 defer cleanup() 807 808 // Insert some data. 809 initialData := map[string][]string{ 810 "wmckinley": {"tjefferson"}, 811 "gwashington": {"jadams"}, 812 "tjefferson": {"gwashington", "jadams", "wmckinley"}, 813 "jadams": {"gwashington", "tjefferson"}, 814 } 815 for row, ss := range initialData { 816 mut := NewMutation() 817 for _, name := range ss { 818 mut.Set("follows", name, 1000, []byte("1")) 819 } 820 if err := table.Apply(ctx, row, mut); err != nil { 821 t.Fatalf("Mutating row %q: %v", row, err) 822 } 823 verifyDirectPathRemoteAddress(testEnv, t) 824 } 825 826 for _, test := range []struct { 827 desc string 828 rr RowSet 829 filter Filter // may be nil 830 limit ReadOption // may be nil 831 832 // We do the read, grab all the cells, turn them into "<row>-<col>-<val>", 833 // and join with a comma. 834 want string 835 wantLabels []string 836 }{ 837 { 838 desc: "read all, unfiltered", 839 rr: RowRange{}, 840 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 841 }, 842 { 843 desc: "read with InfiniteRange, unfiltered", 844 rr: InfiniteRange("tjefferson"), 845 want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 846 }, 847 { 848 desc: "read with NewRange, unfiltered", 849 rr: NewRange("gargamel", "hubbard"), 850 want: "gwashington-jadams-1", 851 }, 852 { 853 desc: "read with PrefixRange, unfiltered", 854 rr: PrefixRange("jad"), 855 want: "jadams-gwashington-1,jadams-tjefferson-1", 856 }, 857 { 858 desc: "read with SingleRow, unfiltered", 859 rr: SingleRow("wmckinley"), 860 want: "wmckinley-tjefferson-1", 861 }, 862 { 863 desc: "read all, with ColumnFilter", 864 rr: RowRange{}, 865 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 866 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 867 }, 868 { 869 desc: "read all, with ColumnFilter, prefix", 870 rr: RowRange{}, 871 filter: ColumnFilter("j"), // no matches 872 want: "", 873 }, 874 { 875 desc: "read range, with ColumnRangeFilter", 876 rr: RowRange{}, 877 filter: ColumnRangeFilter("follows", "h", "k"), 878 want: "gwashington-jadams-1,tjefferson-jadams-1", 879 }, 880 { 881 desc: "read range from empty, with ColumnRangeFilter", 882 rr: RowRange{}, 883 filter: ColumnRangeFilter("follows", "", "u"), 884 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 885 }, 886 { 887 desc: "read range from start to empty, with ColumnRangeFilter", 888 rr: RowRange{}, 889 filter: ColumnRangeFilter("follows", "h", ""), 890 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 891 }, 892 { 893 desc: "read with RowKeyFilter", 894 rr: RowRange{}, 895 filter: RowKeyFilter(".*wash.*"), 896 want: "gwashington-jadams-1", 897 }, 898 { 899 desc: "read with RowKeyFilter, prefix", 900 rr: RowRange{}, 901 filter: RowKeyFilter("gwash"), 902 want: "", 903 }, 904 { 905 desc: "read with RowKeyFilter, no matches", 906 rr: RowRange{}, 907 filter: RowKeyFilter(".*xxx.*"), 908 want: "", 909 }, 910 { 911 desc: "read with FamilyFilter, no matches", 912 rr: RowRange{}, 913 filter: FamilyFilter(".*xxx.*"), 914 want: "", 915 }, 916 { 917 desc: "read with ColumnFilter + row limit", 918 rr: RowRange{}, 919 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 920 limit: LimitRows(2), 921 want: "gwashington-jadams-1,jadams-tjefferson-1", 922 }, 923 { 924 desc: "apply labels to the result rows", 925 rr: RowRange{}, 926 filter: LabelFilter("test-label"), 927 limit: LimitRows(2), 928 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1", 929 wantLabels: []string{"test-label", "test-label", "test-label"}, 930 }, 931 { 932 desc: "read all, strip values", 933 rr: RowRange{}, 934 filter: StripValueFilter(), 935 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 936 }, 937 { 938 desc: "read with ColumnFilter + row limit + strip values", 939 rr: RowRange{}, 940 filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson" 941 limit: LimitRows(2), 942 want: "gwashington-jadams-,jadams-tjefferson-", 943 }, 944 { 945 desc: "read with condition, strip values on true", 946 rr: RowRange{}, 947 filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil), 948 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 949 }, 950 { 951 desc: "read with condition, strip values on false", 952 rr: RowRange{}, 953 filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()), 954 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 955 }, 956 { 957 desc: "read with ValueRangeFilter + row limit", 958 rr: RowRange{}, 959 filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1" 960 limit: LimitRows(2), 961 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1", 962 }, 963 { 964 desc: "read with ValueRangeFilter, no match on exclusive end", 965 rr: RowRange{}, 966 filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match 967 want: "", 968 }, 969 { 970 desc: "read with ValueRangeFilter, no matches", 971 rr: RowRange{}, 972 filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing 973 want: "", 974 }, 975 { 976 desc: "read with InterleaveFilter, no matches on all filters", 977 rr: RowRange{}, 978 filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")), 979 want: "", 980 }, 981 { 982 desc: "read with InterleaveFilter, no duplicate cells", 983 rr: RowRange{}, 984 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")), 985 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 986 }, 987 { 988 desc: "read with InterleaveFilter, with duplicate cells", 989 rr: RowRange{}, 990 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")), 991 want: "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1", 992 }, 993 { 994 desc: "read with a RowRangeList and no filter", 995 rr: RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")}, 996 want: "gwashington-jadams-1,wmckinley-tjefferson-1", 997 }, 998 { 999 desc: "chain that excludes rows and matches nothing, in a condition", 1000 rr: RowRange{}, 1001 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil), 1002 want: "", 1003 }, 1004 { 1005 desc: "chain that ends with an interleave that has no match. covers #804", 1006 rr: RowRange{}, 1007 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil), 1008 want: "", 1009 }, 1010 } { 1011 t.Run(test.desc, func(t *testing.T) { 1012 var opts []ReadOption 1013 if test.filter != nil { 1014 opts = append(opts, RowFilter(test.filter)) 1015 } 1016 if test.limit != nil { 1017 opts = append(opts, test.limit) 1018 } 1019 var elt, labels []string 1020 err := table.ReadRows(ctx, test.rr, func(r Row) bool { 1021 for _, ris := range r { 1022 for _, ri := range ris { 1023 labels = append(labels, ri.Labels...) 1024 elt = append(elt, formatReadItem(ri)) 1025 } 1026 } 1027 return true 1028 }, opts...) 1029 if err != nil { 1030 t.Fatal(err) 1031 } 1032 verifyDirectPathRemoteAddress(testEnv, t) 1033 if got := strings.Join(elt, ","); got != test.want { 1034 t.Fatalf("got %q\nwant %q", got, test.want) 1035 } 1036 if got, want := labels, test.wantLabels; !reflect.DeepEqual(got, want) { 1037 t.Fatalf("got %q\nwant %q", got, want) 1038 } 1039 }) 1040 } 1041} 1042 1043func TestIntegration_SampleRowKeys(t *testing.T) { 1044 ctx := context.Background() 1045 testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 1046 if err != nil { 1047 t.Fatal(err) 1048 } 1049 defer cleanup() 1050 1051 // Insert some data. 1052 initialData := map[string][]string{ 1053 "wmckinley11": {"tjefferson11"}, 1054 "gwashington77": {"jadams77"}, 1055 "tjefferson0": {"gwashington0", "jadams0"}, 1056 } 1057 1058 for row, ss := range initialData { 1059 mut := NewMutation() 1060 for _, name := range ss { 1061 mut.Set("follows", name, 1000, []byte("1")) 1062 } 1063 if err := table.Apply(ctx, row, mut); err != nil { 1064 t.Fatalf("Mutating row %q: %v", row, err) 1065 } 1066 verifyDirectPathRemoteAddress(testEnv, t) 1067 } 1068 sampleKeys, err := table.SampleRowKeys(context.Background()) 1069 if err != nil { 1070 t.Fatalf("%s: %v", "SampleRowKeys:", err) 1071 } 1072 if len(sampleKeys) == 0 { 1073 t.Error("SampleRowKeys length 0") 1074 } 1075} 1076 1077func TestIntegration_Admin(t *testing.T) { 1078 testEnv, err := NewIntegrationEnv() 1079 if err != nil { 1080 t.Fatalf("IntegrationEnv: %v", err) 1081 } 1082 defer testEnv.Close() 1083 1084 timeout := 2 * time.Second 1085 if testEnv.Config().UseProd { 1086 timeout = 5 * time.Minute 1087 } 1088 ctx, _ := context.WithTimeout(context.Background(), timeout) 1089 1090 adminClient, err := testEnv.NewAdminClient() 1091 if err != nil { 1092 t.Fatalf("NewAdminClient: %v", err) 1093 } 1094 defer adminClient.Close() 1095 1096 iAdminClient, err := testEnv.NewInstanceAdminClient() 1097 if err != nil { 1098 t.Fatalf("NewInstanceAdminClient: %v", err) 1099 } 1100 if iAdminClient != nil { 1101 defer iAdminClient.Close() 1102 1103 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1104 if err != nil { 1105 t.Errorf("InstanceInfo: %v", err) 1106 } 1107 if iInfo.Name != adminClient.instance { 1108 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1109 } 1110 } 1111 1112 list := func() []string { 1113 tbls, err := adminClient.Tables(ctx) 1114 if err != nil { 1115 t.Fatalf("Fetching list of tables: %v", err) 1116 } 1117 sort.Strings(tbls) 1118 return tbls 1119 } 1120 containsAll := func(got, want []string) bool { 1121 gotSet := make(map[string]bool) 1122 1123 for _, s := range got { 1124 gotSet[s] = true 1125 } 1126 for _, s := range want { 1127 if !gotSet[s] { 1128 return false 1129 } 1130 } 1131 return true 1132 } 1133 1134 defer deleteTable(ctx, t, adminClient, "mytable") 1135 1136 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1137 t.Fatalf("Creating table: %v", err) 1138 } 1139 1140 defer deleteTable(ctx, t, adminClient, "myothertable") 1141 1142 if err := adminClient.CreateTable(ctx, "myothertable"); err != nil { 1143 t.Fatalf("Creating table: %v", err) 1144 } 1145 1146 if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) { 1147 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1148 } 1149 1150 must(adminClient.WaitForReplication(ctx, "mytable")) 1151 1152 if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil { 1153 t.Fatalf("Deleting table: %v", err) 1154 } 1155 tables := list() 1156 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1157 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1158 } 1159 if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) { 1160 t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted) 1161 } 1162 1163 tblConf := TableConf{ 1164 TableID: "conftable", 1165 Families: map[string]GCPolicy{ 1166 "fam1": MaxVersionsPolicy(1), 1167 "fam2": MaxVersionsPolicy(2), 1168 }, 1169 } 1170 if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil { 1171 t.Fatalf("Creating table from TableConf: %v", err) 1172 } 1173 defer deleteTable(ctx, t, adminClient, tblConf.TableID) 1174 1175 tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID) 1176 if err != nil { 1177 t.Fatalf("Getting table info: %v", err) 1178 } 1179 sort.Strings(tblInfo.Families) 1180 wantFams := []string{"fam1", "fam2"} 1181 if !testutil.Equal(tblInfo.Families, wantFams) { 1182 t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams) 1183 } 1184 1185 // Populate mytable and drop row ranges 1186 if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil { 1187 t.Fatalf("Creating column family: %v", err) 1188 } 1189 1190 client, err := testEnv.NewClient() 1191 if err != nil { 1192 t.Fatalf("NewClient: %v", err) 1193 } 1194 defer client.Close() 1195 1196 tbl := client.Open("mytable") 1197 1198 prefixes := []string{"a", "b", "c"} 1199 for _, prefix := range prefixes { 1200 for i := 0; i < 5; i++ { 1201 mut := NewMutation() 1202 mut.Set("cf", "col", 1000, []byte("1")) 1203 if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil { 1204 t.Fatalf("Mutating row: %v", err) 1205 } 1206 } 1207 } 1208 1209 if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil { 1210 t.Errorf("DropRowRange a: %v", err) 1211 } 1212 if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil { 1213 t.Errorf("DropRowRange c: %v", err) 1214 } 1215 if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil { 1216 t.Errorf("DropRowRange x: %v", err) 1217 } 1218 1219 var gotRowCount int 1220 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1221 gotRowCount++ 1222 if !strings.HasPrefix(row.Key(), "b") { 1223 t.Errorf("Invalid row after dropping range: %v", row) 1224 } 1225 return true 1226 })) 1227 if gotRowCount != 5 { 1228 t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5) 1229 } 1230 1231 if err = adminClient.DropAllRows(ctx, "mytable"); err != nil { 1232 t.Errorf("DropAllRows mytable: %v", err) 1233 } 1234 1235 gotRowCount = 0 1236 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1237 gotRowCount++ 1238 return true 1239 })) 1240 if gotRowCount != 0 { 1241 t.Errorf("Invalid row count after truncating table: got %v, want %v", gotRowCount, 0) 1242 } 1243} 1244 1245func TestIntegration_TableIam(t *testing.T) { 1246 testEnv, err := NewIntegrationEnv() 1247 if err != nil { 1248 t.Fatalf("IntegrationEnv: %v", err) 1249 } 1250 defer testEnv.Close() 1251 1252 if !testEnv.Config().UseProd { 1253 t.Skip("emulator doesn't support IAM Policy creation") 1254 } 1255 1256 timeout := 5 * time.Minute 1257 ctx, _ := context.WithTimeout(context.Background(), timeout) 1258 1259 adminClient, err := testEnv.NewAdminClient() 1260 if err != nil { 1261 t.Fatalf("NewAdminClient: %v", err) 1262 } 1263 defer adminClient.Close() 1264 1265 defer deleteTable(ctx, t, adminClient, "mytable") 1266 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1267 t.Fatalf("Creating table: %v", err) 1268 } 1269 1270 // Verify that the IAM Controls work for Tables. 1271 iamHandle := adminClient.TableIAM("mytable") 1272 p, err := iamHandle.Policy(ctx) 1273 if err != nil { 1274 t.Errorf("Iam GetPolicy mytable: %v", err) 1275 } 1276 if err = iamHandle.SetPolicy(ctx, p); err != nil { 1277 t.Errorf("Iam SetPolicy mytable: %v", err) 1278 } 1279 if _, err = iamHandle.TestPermissions(ctx, []string{"bigtable.tables.get"}); err != nil { 1280 t.Errorf("Iam TestPermissions mytable: %v", err) 1281 } 1282} 1283 1284func TestIntegration_BackupIAM(t *testing.T) { 1285 testEnv, err := NewIntegrationEnv() 1286 if err != nil { 1287 t.Fatalf("IntegrationEnv: %v", err) 1288 } 1289 defer testEnv.Close() 1290 1291 if !testEnv.Config().UseProd { 1292 t.Skip("emulator doesn't support IAM Policy creation") 1293 } 1294 timeout := 5 * time.Minute 1295 ctx, _ := context.WithTimeout(context.Background(), timeout) 1296 1297 adminClient, err := testEnv.NewAdminClient() 1298 if err != nil { 1299 t.Fatalf("NewAdminClient: %v", err) 1300 } 1301 defer adminClient.Close() 1302 1303 table := testEnv.Config().Table 1304 cluster := testEnv.Config().Cluster 1305 1306 defer deleteTable(ctx, t, adminClient, table) 1307 if err := adminClient.CreateTable(ctx, table); err != nil { 1308 t.Fatalf("Creating table: %v", err) 1309 } 1310 // Create backup. 1311 backup := "backup" 1312 defer adminClient.DeleteBackup(ctx, cluster, backup) 1313 if err = adminClient.CreateBackup(ctx, table, cluster, backup, time.Now().Add(8*time.Hour)); err != nil { 1314 t.Fatalf("Creating backup: %v", err) 1315 } 1316 iamHandle := adminClient.BackupIAM(cluster, backup) 1317 // Get backup policy. 1318 p, err := iamHandle.Policy(ctx) 1319 if err != nil { 1320 t.Errorf("iamHandle.Policy: %v", err) 1321 } 1322 // The resource is new, so the policy should be empty. 1323 if got := p.Roles(); len(got) > 0 { 1324 t.Errorf("got roles %v, want none", got) 1325 } 1326 // Set backup policy. 1327 member := "domain:google.com" 1328 // Add a member, set the policy, then check that the member is present. 1329 p.Add(member, iam.Viewer) 1330 if err = iamHandle.SetPolicy(ctx, p); err != nil { 1331 t.Errorf("iamHandle.SetPolicy: %v", err) 1332 } 1333 p, err = iamHandle.Policy(ctx) 1334 if err != nil { 1335 t.Errorf("iamHandle.Policy: %v", err) 1336 } 1337 if got, want := p.Members(iam.Viewer), []string{member}; !testutil.Equal(got, want) { 1338 t.Errorf("iamHandle.Policy: got %v, want %v", got, want) 1339 } 1340 // Test backup permissions. 1341 permissions := []string{"bigtable.backups.get", "bigtable.backups.update"} 1342 _, err = iamHandle.TestPermissions(ctx, permissions) 1343 if err != nil { 1344 t.Errorf("iamHandle.TestPermissions: %v", err) 1345 } 1346} 1347 1348func TestIntegration_AdminCreateInstance(t *testing.T) { 1349 if instanceToCreate == "" { 1350 t.Skip("instanceToCreate not set, skipping instance creation testing") 1351 } 1352 1353 testEnv, err := NewIntegrationEnv() 1354 if err != nil { 1355 t.Fatalf("IntegrationEnv: %v", err) 1356 } 1357 defer testEnv.Close() 1358 1359 if !testEnv.Config().UseProd { 1360 t.Skip("emulator doesn't support instance creation") 1361 } 1362 1363 timeout := 5 * time.Minute 1364 ctx, _ := context.WithTimeout(context.Background(), timeout) 1365 1366 iAdminClient, err := testEnv.NewInstanceAdminClient() 1367 if err != nil { 1368 t.Fatalf("NewInstanceAdminClient: %v", err) 1369 } 1370 defer iAdminClient.Close() 1371 1372 clusterID := instanceToCreate + "-cluster" 1373 1374 // Create a development instance 1375 conf := &InstanceConf{ 1376 InstanceId: instanceToCreate, 1377 ClusterId: clusterID, 1378 DisplayName: "test instance", 1379 Zone: instanceToCreateZone, 1380 InstanceType: DEVELOPMENT, 1381 Labels: map[string]string{"test-label-key": "test-label-value"}, 1382 } 1383 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1384 t.Fatalf("CreateInstance: %v", err) 1385 } 1386 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1387 1388 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1389 if err != nil { 1390 t.Fatalf("InstanceInfo: %v", err) 1391 } 1392 1393 // Basic return values are tested elsewhere, check instance type 1394 if iInfo.InstanceType != DEVELOPMENT { 1395 t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType) 1396 } 1397 if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) { 1398 t.Fatalf("Labels: %v, want: %v", got, want) 1399 } 1400 1401 // Update everything we can about the instance in one call. 1402 confWithClusters := &InstanceWithClustersConfig{ 1403 InstanceID: instanceToCreate, 1404 DisplayName: "new display name", 1405 InstanceType: PRODUCTION, 1406 Labels: map[string]string{"new-label-key": "new-label-value"}, 1407 Clusters: []ClusterConfig{ 1408 {ClusterID: clusterID, NumNodes: 5}}, 1409 } 1410 1411 if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { 1412 t.Fatalf("UpdateInstanceWithClusters: %v", err) 1413 } 1414 1415 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1416 if err != nil { 1417 t.Fatalf("InstanceInfo: %v", err) 1418 } 1419 1420 if iInfo.InstanceType != PRODUCTION { 1421 t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType) 1422 } 1423 if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) { 1424 t.Fatalf("Labels: %v, want: %v", got, want) 1425 } 1426 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1427 t.Fatalf("Display name: %q, want: %q", got, want) 1428 } 1429 1430 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1431 if err != nil { 1432 t.Fatalf("GetCluster: %v", err) 1433 } 1434 1435 if cInfo.ServeNodes != 5 { 1436 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1437 } 1438} 1439 1440func TestIntegration_AdminUpdateInstanceLabels(t *testing.T) { 1441 1442 // Check the environments 1443 if instanceToCreate == "" { 1444 t.Skip("instanceToCreate not set, skipping instance creation testing") 1445 } 1446 testEnv, err := NewIntegrationEnv() 1447 if err != nil { 1448 t.Fatalf("IntegrationEnv: %v", err) 1449 } 1450 defer testEnv.Close() 1451 if !testEnv.Config().UseProd { 1452 t.Skip("emulator doesn't support instance creation") 1453 } 1454 1455 // Create an instance admin client 1456 timeout := 5 * time.Minute 1457 ctx, _ := context.WithTimeout(context.Background(), timeout) 1458 iAdminClient, err := testEnv.NewInstanceAdminClient() 1459 if err != nil { 1460 t.Fatalf("NewInstanceAdminClient: %v", err) 1461 } 1462 defer iAdminClient.Close() 1463 1464 // Create a test instance 1465 conf := &InstanceConf{ 1466 InstanceId: instanceToCreate, 1467 ClusterId: instanceToCreate + "-cluster", 1468 DisplayName: "test instance", 1469 InstanceType: DEVELOPMENT, 1470 Zone: instanceToCreateZone, 1471 } 1472 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1473 t.Fatalf("CreateInstance: %v", err) 1474 } 1475 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1476 1477 // Check the created test instances 1478 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1479 if err != nil { 1480 t.Fatalf("InstanceInfo: %v", err) 1481 } 1482 if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) { 1483 t.Fatalf("Labels: %v, want: %v", got, want) 1484 } 1485 1486 // Test patterns to update instance labels 1487 tests := []struct { 1488 name string 1489 in map[string]string 1490 out map[string]string 1491 }{ 1492 { 1493 name: "update labels", 1494 in: map[string]string{"test-label-key": "test-label-value"}, 1495 out: map[string]string{"test-label-key": "test-label-value"}, 1496 }, 1497 { 1498 name: "update multiple labels", 1499 in: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"}, 1500 out: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"}, 1501 }, 1502 { 1503 name: "not update existing labels", 1504 in: nil, // nil map 1505 out: map[string]string{"update-label-key-a": "update-label-value-a", "update-label-key-b": "update-label-value-b"}, 1506 }, 1507 { 1508 name: "delete labels", 1509 in: map[string]string{}, // empty map 1510 out: nil, 1511 }, 1512 } 1513 for _, tt := range tests { 1514 t.Run(tt.name, func(t *testing.T) { 1515 confWithClusters := &InstanceWithClustersConfig{ 1516 InstanceID: instanceToCreate, 1517 Labels: tt.in, 1518 } 1519 if err := iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { 1520 t.Fatalf("UpdateInstanceWithClusters: %v", err) 1521 } 1522 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1523 if err != nil { 1524 t.Fatalf("InstanceInfo: %v", err) 1525 } 1526 if got, want := iInfo.Labels, tt.out; !cmp.Equal(got, want) { 1527 t.Fatalf("Labels: %v, want: %v", got, want) 1528 } 1529 }) 1530 } 1531} 1532 1533func TestIntegration_AdminUpdateInstanceAndSyncClusters(t *testing.T) { 1534 if instanceToCreate == "" { 1535 t.Skip("instanceToCreate not set, skipping instance update testing") 1536 } 1537 1538 testEnv, err := NewIntegrationEnv() 1539 if err != nil { 1540 t.Fatalf("IntegrationEnv: %v", err) 1541 } 1542 defer testEnv.Close() 1543 1544 if !testEnv.Config().UseProd { 1545 t.Skip("emulator doesn't support instance creation") 1546 } 1547 1548 timeout := 5 * time.Minute 1549 ctx, _ := context.WithTimeout(context.Background(), timeout) 1550 1551 iAdminClient, err := testEnv.NewInstanceAdminClient() 1552 if err != nil { 1553 t.Fatalf("NewInstanceAdminClient: %v", err) 1554 } 1555 defer iAdminClient.Close() 1556 1557 clusterID := instanceToCreate + "-cluster" 1558 1559 // Create a development instance 1560 conf := &InstanceConf{ 1561 InstanceId: instanceToCreate, 1562 ClusterId: clusterID, 1563 DisplayName: "test instance", 1564 Zone: instanceToCreateZone, 1565 InstanceType: DEVELOPMENT, 1566 Labels: map[string]string{"test-label-key": "test-label-value"}, 1567 } 1568 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1569 t.Fatalf("CreateInstance: %v", err) 1570 } 1571 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1572 1573 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1574 if err != nil { 1575 t.Fatalf("InstanceInfo: %v", err) 1576 } 1577 1578 // Basic return values are tested elsewhere, check instance type 1579 if iInfo.InstanceType != DEVELOPMENT { 1580 t.Fatalf("Instance is not DEVELOPMENT: %v", iInfo.InstanceType) 1581 } 1582 if got, want := iInfo.Labels, conf.Labels; !cmp.Equal(got, want) { 1583 t.Fatalf("Labels: %v, want: %v", got, want) 1584 } 1585 1586 // Update everything we can about the instance in one call. 1587 confWithClusters := &InstanceWithClustersConfig{ 1588 InstanceID: instanceToCreate, 1589 DisplayName: "new display name", 1590 InstanceType: PRODUCTION, 1591 Labels: map[string]string{"new-label-key": "new-label-value"}, 1592 Clusters: []ClusterConfig{ 1593 {ClusterID: clusterID, NumNodes: 5}}, 1594 } 1595 1596 results, err := UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1597 if err != nil { 1598 t.Fatalf("UpdateInstanceAndSyncClusters: %v", err) 1599 } 1600 1601 wantResults := UpdateInstanceResults{ 1602 InstanceUpdated: true, 1603 UpdatedClusters: []string{clusterID}, 1604 } 1605 if diff := testutil.Diff(*results, wantResults); diff != "" { 1606 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1607 } 1608 1609 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1610 if err != nil { 1611 t.Fatalf("InstanceInfo: %v", err) 1612 } 1613 1614 if iInfo.InstanceType != PRODUCTION { 1615 t.Fatalf("Instance type is not PRODUCTION: %v", iInfo.InstanceType) 1616 } 1617 if got, want := iInfo.Labels, confWithClusters.Labels; !cmp.Equal(got, want) { 1618 t.Fatalf("Labels: %v, want: %v", got, want) 1619 } 1620 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1621 t.Fatalf("Display name: %q, want: %q", got, want) 1622 } 1623 1624 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1625 if err != nil { 1626 t.Fatalf("GetCluster: %v", err) 1627 } 1628 1629 if cInfo.ServeNodes != 5 { 1630 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1631 } 1632 1633 // Now add a second cluster as the only change. The first cluster must also be provided so 1634 // it is not removed. 1635 clusterID2 := clusterID + "-2" 1636 confWithClusters = &InstanceWithClustersConfig{ 1637 InstanceID: instanceToCreate, 1638 Clusters: []ClusterConfig{ 1639 {ClusterID: clusterID}, 1640 {ClusterID: clusterID2, NumNodes: 3, StorageType: SSD, Zone: instanceToCreateZone2}}, 1641 } 1642 1643 results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1644 if err != nil { 1645 t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err) 1646 } 1647 1648 wantResults = UpdateInstanceResults{ 1649 InstanceUpdated: false, 1650 CreatedClusters: []string{clusterID2}, 1651 } 1652 if diff := testutil.Diff(*results, wantResults); diff != "" { 1653 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1654 } 1655 1656 // Now update one cluster and delete the other 1657 confWithClusters = &InstanceWithClustersConfig{ 1658 InstanceID: instanceToCreate, 1659 Clusters: []ClusterConfig{ 1660 {ClusterID: clusterID, NumNodes: 4}}, 1661 } 1662 1663 results, err = UpdateInstanceAndSyncClusters(ctx, iAdminClient, confWithClusters) 1664 if err != nil { 1665 t.Fatalf("UpdateInstanceAndSyncClusters: %v %v", confWithClusters, err) 1666 } 1667 1668 wantResults = UpdateInstanceResults{ 1669 InstanceUpdated: false, 1670 UpdatedClusters: []string{clusterID}, 1671 DeletedClusters: []string{clusterID2}, 1672 } 1673 1674 if diff := testutil.Diff(*results, wantResults); diff != "" { 1675 t.Fatalf("UpdateInstanceResults: got - want +\n%s", diff) 1676 } 1677 1678 // Make sure the instance looks as we would expect 1679 clusters, err := iAdminClient.Clusters(ctx, conf.InstanceId) 1680 if err != nil { 1681 t.Fatalf("Clusters: %v", err) 1682 } 1683 1684 if len(clusters) != 1 { 1685 t.Fatalf("Clusters length %v, want: 1", len(clusters)) 1686 } 1687 1688 wantCluster := &ClusterInfo{ 1689 Name: clusterID, 1690 Zone: instanceToCreateZone, 1691 ServeNodes: 4, 1692 State: "READY", 1693 } 1694 if diff := testutil.Diff(clusters[0], wantCluster); diff != "" { 1695 t.Fatalf("InstanceEquality: got - want +\n%s", diff) 1696 } 1697} 1698 1699func TestIntegration_AdminSnapshot(t *testing.T) { 1700 testEnv, err := NewIntegrationEnv() 1701 if err != nil { 1702 t.Fatalf("IntegrationEnv: %v", err) 1703 } 1704 defer testEnv.Close() 1705 1706 if !testEnv.Config().UseProd { 1707 t.Skip("emulator doesn't support snapshots") 1708 } 1709 1710 timeout := 2 * time.Second 1711 if testEnv.Config().UseProd { 1712 timeout = 5 * time.Minute 1713 } 1714 ctx, _ := context.WithTimeout(context.Background(), timeout) 1715 1716 adminClient, err := testEnv.NewAdminClient() 1717 if err != nil { 1718 t.Fatalf("NewAdminClient: %v", err) 1719 } 1720 defer adminClient.Close() 1721 1722 table := testEnv.Config().Table 1723 cluster := testEnv.Config().Cluster 1724 1725 list := func(cluster string) ([]*SnapshotInfo, error) { 1726 infos := []*SnapshotInfo(nil) 1727 1728 it := adminClient.Snapshots(ctx, cluster) 1729 for { 1730 s, err := it.Next() 1731 if err == iterator.Done { 1732 break 1733 } 1734 if err != nil { 1735 return nil, err 1736 } 1737 infos = append(infos, s) 1738 } 1739 return infos, err 1740 } 1741 1742 // Delete the table at the end of the test. Schedule ahead of time 1743 // in case the client fails 1744 defer deleteTable(ctx, t, adminClient, table) 1745 1746 if err := adminClient.CreateTable(ctx, table); err != nil { 1747 t.Fatalf("Creating table: %v", err) 1748 } 1749 1750 // Precondition: no snapshots 1751 snapshots, err := list(cluster) 1752 if err != nil { 1753 t.Fatalf("Initial snapshot list: %v", err) 1754 } 1755 if got, want := len(snapshots), 0; got != want { 1756 t.Fatalf("Initial snapshot list len: %d, want: %d", got, want) 1757 } 1758 1759 // Create snapshot 1760 defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot") 1761 1762 if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil { 1763 t.Fatalf("Creating snaphot: %v", err) 1764 } 1765 1766 // List snapshot 1767 snapshots, err = list(cluster) 1768 if err != nil { 1769 t.Fatalf("Listing snapshots: %v", err) 1770 } 1771 if got, want := len(snapshots), 1; got != want { 1772 t.Fatalf("Listing snapshot count: %d, want: %d", got, want) 1773 } 1774 if got, want := snapshots[0].Name, "mysnapshot"; got != want { 1775 t.Fatalf("Snapshot name: %s, want: %s", got, want) 1776 } 1777 if got, want := snapshots[0].SourceTable, table; got != want { 1778 t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want) 1779 } 1780 if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { 1781 t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want) 1782 } 1783 1784 // Get snapshot 1785 snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot") 1786 if err != nil { 1787 t.Fatalf("SnapshotInfo: %v", snapshot) 1788 } 1789 if got, want := *snapshot, *snapshots[0]; got != want { 1790 t.Fatalf("SnapshotInfo: %v, want: %v", got, want) 1791 } 1792 1793 // Restore 1794 restoredTable := table + "-restored" 1795 defer deleteTable(ctx, t, adminClient, restoredTable) 1796 if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil { 1797 t.Fatalf("CreateTableFromSnapshot: %v", err) 1798 } 1799 if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { 1800 t.Fatalf("Restored TableInfo: %v", err) 1801 } 1802 1803 // Delete snapshot 1804 if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil { 1805 t.Fatalf("DeleteSnapshot: %v", err) 1806 } 1807 snapshots, err = list(cluster) 1808 if err != nil { 1809 t.Fatalf("List after Delete: %v", err) 1810 } 1811 if got, want := len(snapshots), 0; got != want { 1812 t.Fatalf("List after delete len: %d, want: %d", got, want) 1813 } 1814} 1815 1816// instanceAdminClientMock is used to test FailedLocations field processing. 1817type instanceAdminClientMock struct { 1818 Clusters []*btapb.Cluster 1819 UnavailableLocations []string 1820} 1821 1822func (iacm *instanceAdminClientMock) ListClusters(ctx context.Context, req *btapb.ListClustersRequest, opts ...grpc.CallOption) (*btapb.ListClustersResponse, error) { 1823 res := btapb.ListClustersResponse{ 1824 Clusters: iacm.Clusters, 1825 FailedLocations: iacm.UnavailableLocations, 1826 } 1827 return &res, nil 1828} 1829 1830func (iacm *instanceAdminClientMock) CreateInstance(ctx context.Context, in *btapb.CreateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) { 1831 return nil, nil 1832} 1833func (iacm *instanceAdminClientMock) GetInstance(ctx context.Context, in *btapb.GetInstanceRequest, opts ...grpc.CallOption) (*btapb.Instance, error) { 1834 return nil, nil 1835} 1836func (iacm *instanceAdminClientMock) ListInstances(ctx context.Context, in *btapb.ListInstancesRequest, opts ...grpc.CallOption) (*btapb.ListInstancesResponse, error) { 1837 return nil, nil 1838} 1839func (iacm *instanceAdminClientMock) UpdateInstance(ctx context.Context, in *btapb.Instance, opts ...grpc.CallOption) (*btapb.Instance, error) { 1840 return nil, nil 1841} 1842func (iacm *instanceAdminClientMock) PartialUpdateInstance(ctx context.Context, in *btapb.PartialUpdateInstanceRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) { 1843 return nil, nil 1844} 1845func (iacm *instanceAdminClientMock) DeleteInstance(ctx context.Context, in *btapb.DeleteInstanceRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { 1846 return nil, nil 1847} 1848func (iacm *instanceAdminClientMock) CreateCluster(ctx context.Context, in *btapb.CreateClusterRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) { 1849 return nil, nil 1850} 1851func (iacm *instanceAdminClientMock) GetCluster(ctx context.Context, in *btapb.GetClusterRequest, opts ...grpc.CallOption) (*btapb.Cluster, error) { 1852 return nil, nil 1853} 1854func (iacm *instanceAdminClientMock) UpdateCluster(ctx context.Context, in *btapb.Cluster, opts ...grpc.CallOption) (*longrunning.Operation, error) { 1855 return nil, nil 1856} 1857func (iacm *instanceAdminClientMock) DeleteCluster(ctx context.Context, in *btapb.DeleteClusterRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { 1858 return nil, nil 1859} 1860func (iacm *instanceAdminClientMock) CreateAppProfile(ctx context.Context, in *btapb.CreateAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) { 1861 return nil, nil 1862} 1863func (iacm *instanceAdminClientMock) GetAppProfile(ctx context.Context, in *btapb.GetAppProfileRequest, opts ...grpc.CallOption) (*btapb.AppProfile, error) { 1864 return nil, nil 1865} 1866func (iacm *instanceAdminClientMock) ListAppProfiles(ctx context.Context, in *btapb.ListAppProfilesRequest, opts ...grpc.CallOption) (*btapb.ListAppProfilesResponse, error) { 1867 return nil, nil 1868} 1869func (iacm *instanceAdminClientMock) UpdateAppProfile(ctx context.Context, in *btapb.UpdateAppProfileRequest, opts ...grpc.CallOption) (*longrunning.Operation, error) { 1870 return nil, nil 1871} 1872func (iacm *instanceAdminClientMock) DeleteAppProfile(ctx context.Context, in *btapb.DeleteAppProfileRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { 1873 return nil, nil 1874} 1875func (iacm *instanceAdminClientMock) GetIamPolicy(ctx context.Context, in *v1.GetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) { 1876 return nil, nil 1877} 1878func (iacm *instanceAdminClientMock) SetIamPolicy(ctx context.Context, in *v1.SetIamPolicyRequest, opts ...grpc.CallOption) (*v1.Policy, error) { 1879 return nil, nil 1880} 1881func (iacm *instanceAdminClientMock) TestIamPermissions(ctx context.Context, in *v1.TestIamPermissionsRequest, opts ...grpc.CallOption) (*v1.TestIamPermissionsResponse, error) { 1882 return nil, nil 1883} 1884 1885func TestIntegration_InstanceAdminClient_Clusters_WithFailedLocations(t *testing.T) { 1886 testEnv, err := NewIntegrationEnv() 1887 if err != nil { 1888 t.Fatalf("IntegrationEnv: %v", err) 1889 } 1890 defer testEnv.Close() 1891 1892 if !testEnv.Config().UseProd { 1893 t.Skip("emulator doesn't support snapshots") 1894 } 1895 1896 iAdminClient, err := testEnv.NewInstanceAdminClient() 1897 if err != nil { 1898 t.Fatalf("NewInstanceAdminClient: %v", err) 1899 } 1900 defer iAdminClient.Close() 1901 1902 cluster1 := btapb.Cluster{Name: "cluster1"} 1903 failedLoc := "euro1" 1904 1905 iAdminClient.iClient = &instanceAdminClientMock{ 1906 Clusters: []*btapb.Cluster{&cluster1}, 1907 UnavailableLocations: []string{failedLoc}, 1908 } 1909 1910 cis, err := iAdminClient.Clusters(context.Background(), "instance-id") 1911 convertedErr, ok := err.(ErrPartiallyUnavailable) 1912 if !ok { 1913 t.Fatalf("want error ErrPartiallyUnavailable, got other") 1914 } 1915 if got, want := len(convertedErr.Locations), 1; got != want { 1916 t.Fatalf("want %v failed locations, got %v", want, got) 1917 } 1918 if got, want := convertedErr.Locations[0], failedLoc; got != want { 1919 t.Fatalf("want failed location %v, got %v", want, got) 1920 } 1921 if got, want := len(cis), 1; got != want { 1922 t.Fatalf("want %v failed locations, got %v", want, got) 1923 } 1924 if got, want := cis[0].Name, cluster1.Name; got != want { 1925 t.Fatalf("want cluster %v, got %v", want, got) 1926 } 1927} 1928 1929func TestIntegration_Granularity(t *testing.T) { 1930 testEnv, err := NewIntegrationEnv() 1931 if err != nil { 1932 t.Fatalf("IntegrationEnv: %v", err) 1933 } 1934 defer testEnv.Close() 1935 1936 timeout := 2 * time.Second 1937 if testEnv.Config().UseProd { 1938 timeout = 5 * time.Minute 1939 } 1940 ctx, _ := context.WithTimeout(context.Background(), timeout) 1941 1942 adminClient, err := testEnv.NewAdminClient() 1943 if err != nil { 1944 t.Fatalf("NewAdminClient: %v", err) 1945 } 1946 defer adminClient.Close() 1947 1948 list := func() []string { 1949 tbls, err := adminClient.Tables(ctx) 1950 if err != nil { 1951 t.Fatalf("Fetching list of tables: %v", err) 1952 } 1953 sort.Strings(tbls) 1954 return tbls 1955 } 1956 containsAll := func(got, want []string) bool { 1957 gotSet := make(map[string]bool) 1958 1959 for _, s := range got { 1960 gotSet[s] = true 1961 } 1962 for _, s := range want { 1963 if !gotSet[s] { 1964 return false 1965 } 1966 } 1967 return true 1968 } 1969 1970 defer deleteTable(ctx, t, adminClient, "mytable") 1971 1972 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1973 t.Fatalf("Creating table: %v", err) 1974 } 1975 1976 tables := list() 1977 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1978 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1979 } 1980 1981 // calling ModifyColumnFamilies to check the granularity of table 1982 prefix := adminClient.instancePrefix() 1983 req := &btapb.ModifyColumnFamiliesRequest{ 1984 Name: prefix + "/tables/" + "mytable", 1985 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 1986 Id: "cf", 1987 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}}, 1988 }}, 1989 } 1990 table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req) 1991 if err != nil { 1992 t.Fatalf("Creating column family: %v", err) 1993 } 1994 if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) { 1995 t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS)) 1996 } 1997} 1998 1999func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) { 2000 testEnv, err := NewIntegrationEnv() 2001 if err != nil { 2002 t.Fatalf("IntegrationEnv: %v", err) 2003 } 2004 defer testEnv.Close() 2005 2006 timeout := 2 * time.Second 2007 if testEnv.Config().UseProd { 2008 timeout = 5 * time.Minute 2009 } 2010 ctx, cancel := context.WithTimeout(context.Background(), timeout) 2011 defer cancel() 2012 2013 adminClient, err := testEnv.NewAdminClient() 2014 if err != nil { 2015 t.Fatalf("NewAdminClient: %v", err) 2016 } 2017 defer adminClient.Close() 2018 2019 iAdminClient, err := testEnv.NewInstanceAdminClient() 2020 if err != nil { 2021 t.Fatalf("NewInstanceAdminClient: %v", err) 2022 } 2023 2024 if iAdminClient == nil { 2025 return 2026 } 2027 2028 defer iAdminClient.Close() 2029 profile := ProfileConf{ 2030 ProfileID: "app_profile1", 2031 InstanceID: adminClient.instance, 2032 ClusterID: testEnv.Config().Cluster, 2033 Description: "creating new app profile 1", 2034 RoutingPolicy: SingleClusterRouting, 2035 } 2036 2037 createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile) 2038 if err != nil { 2039 t.Fatalf("Creating app profile: %v", err) 2040 2041 } 2042 2043 gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 2044 2045 if err != nil { 2046 t.Fatalf("Get app profile: %v", err) 2047 } 2048 2049 if !proto.Equal(createdProfile, gotProfile) { 2050 t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name) 2051 2052 } 2053 2054 list := func(instanceID string) ([]*btapb.AppProfile, error) { 2055 profiles := []*btapb.AppProfile(nil) 2056 2057 it := iAdminClient.ListAppProfiles(ctx, instanceID) 2058 for { 2059 s, err := it.Next() 2060 if err == iterator.Done { 2061 break 2062 } 2063 if err != nil { 2064 return nil, err 2065 } 2066 profiles = append(profiles, s) 2067 } 2068 return profiles, err 2069 } 2070 2071 profiles, err := list(adminClient.instance) 2072 if err != nil { 2073 t.Fatalf("List app profile: %v", err) 2074 } 2075 2076 if got, want := len(profiles), 1; got != want { 2077 t.Fatalf("Initial app profile list len: %d, want: %d", got, want) 2078 } 2079 2080 for _, test := range []struct { 2081 desc string 2082 uattrs ProfileAttrsToUpdate 2083 want *btapb.AppProfile // nil means error 2084 }{ 2085 { 2086 desc: "empty update", 2087 uattrs: ProfileAttrsToUpdate{}, 2088 want: nil, 2089 }, 2090 2091 { 2092 desc: "empty description update", 2093 uattrs: ProfileAttrsToUpdate{Description: ""}, 2094 want: &btapb.AppProfile{ 2095 Name: gotProfile.Name, 2096 Description: "", 2097 RoutingPolicy: gotProfile.RoutingPolicy, 2098 Etag: gotProfile.Etag}, 2099 }, 2100 { 2101 desc: "routing update", 2102 uattrs: ProfileAttrsToUpdate{ 2103 RoutingPolicy: SingleClusterRouting, 2104 ClusterID: testEnv.Config().Cluster, 2105 }, 2106 want: &btapb.AppProfile{ 2107 Name: gotProfile.Name, 2108 Description: "", 2109 Etag: gotProfile.Etag, 2110 RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{ 2111 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 2112 ClusterId: testEnv.Config().Cluster, 2113 }}, 2114 }, 2115 }, 2116 } { 2117 err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs) 2118 if err != nil { 2119 if test.want != nil { 2120 t.Errorf("%s: %v", test.desc, err) 2121 } 2122 continue 2123 } 2124 if err == nil && test.want == nil { 2125 t.Errorf("%s: got nil, want error", test.desc) 2126 continue 2127 } 2128 2129 got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 2130 2131 if !proto.Equal(got, test.want) { 2132 t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want) 2133 } 2134 2135 } 2136 2137 err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1") 2138 if err != nil { 2139 t.Fatalf("Delete app profile: %v", err) 2140 } 2141 2142} 2143 2144func TestIntegration_InstanceUpdate(t *testing.T) { 2145 testEnv, err := NewIntegrationEnv() 2146 if err != nil { 2147 t.Fatalf("IntegrationEnv: %v", err) 2148 } 2149 defer testEnv.Close() 2150 2151 timeout := 2 * time.Second 2152 if testEnv.Config().UseProd { 2153 timeout = 5 * time.Minute 2154 } 2155 ctx, cancel := context.WithTimeout(context.Background(), timeout) 2156 defer cancel() 2157 2158 adminClient, err := testEnv.NewAdminClient() 2159 if err != nil { 2160 t.Fatalf("NewAdminClient: %v", err) 2161 } 2162 2163 defer adminClient.Close() 2164 2165 iAdminClient, err := testEnv.NewInstanceAdminClient() 2166 if err != nil { 2167 t.Fatalf("NewInstanceAdminClient: %v", err) 2168 } 2169 2170 if iAdminClient == nil { 2171 return 2172 } 2173 2174 defer iAdminClient.Close() 2175 2176 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 2177 if err != nil { 2178 t.Errorf("InstanceInfo: %v", err) 2179 } 2180 2181 if iInfo.Name != adminClient.instance { 2182 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 2183 } 2184 2185 if iInfo.DisplayName != adminClient.instance { 2186 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 2187 } 2188 2189 const numNodes = 4 2190 // update cluster nodes 2191 if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil { 2192 t.Errorf("UpdateCluster: %v", err) 2193 } 2194 2195 // get cluster after updating 2196 cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster) 2197 if err != nil { 2198 t.Errorf("GetCluster %v", err) 2199 } 2200 if cis.ServeNodes != int(numNodes) { 2201 t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes)) 2202 } 2203} 2204 2205func TestIntegration_AdminBackup(t *testing.T) { 2206 testEnv, err := NewIntegrationEnv() 2207 if err != nil { 2208 t.Fatalf("IntegrationEnv: %v", err) 2209 } 2210 defer testEnv.Close() 2211 2212 if !testEnv.Config().UseProd { 2213 t.Skip("emulator doesn't support backups") 2214 } 2215 2216 timeout := 5 * time.Minute 2217 ctx, _ := context.WithTimeout(context.Background(), timeout) 2218 2219 adminClient, err := testEnv.NewAdminClient() 2220 if err != nil { 2221 t.Fatalf("NewAdminClient: %v", err) 2222 } 2223 defer adminClient.Close() 2224 2225 table := testEnv.Config().Table 2226 cluster := testEnv.Config().Cluster 2227 2228 // Delete the table at the end of the test. Schedule ahead of time 2229 // in case the client fails 2230 defer deleteTable(ctx, t, adminClient, table) 2231 2232 list := func(cluster string) ([]*BackupInfo, error) { 2233 infos := []*BackupInfo(nil) 2234 2235 it := adminClient.Backups(ctx, cluster) 2236 for { 2237 s, err := it.Next() 2238 if err == iterator.Done { 2239 break 2240 } 2241 if err != nil { 2242 return nil, err 2243 } 2244 infos = append(infos, s) 2245 } 2246 return infos, err 2247 } 2248 2249 if err := adminClient.CreateTable(ctx, table); err != nil { 2250 t.Fatalf("Creating table: %v", err) 2251 } 2252 2253 // Precondition: no backups 2254 backups, err := list(cluster) 2255 if err != nil { 2256 t.Fatalf("Initial backup list: %v", err) 2257 } 2258 if got, want := len(backups), 0; got != want { 2259 t.Fatalf("Initial backup list len: %d, want: %d", got, want) 2260 } 2261 2262 // Create backup 2263 backupName := "mybackup" 2264 defer adminClient.DeleteBackup(ctx, cluster, "mybackup") 2265 2266 if err = adminClient.CreateBackup(ctx, table, cluster, backupName, time.Now().Add(8*time.Hour)); err != nil { 2267 t.Fatalf("Creating backup: %v", err) 2268 } 2269 2270 // List backup 2271 backups, err = list(cluster) 2272 if err != nil { 2273 t.Fatalf("Listing backups: %v", err) 2274 } 2275 if got, want := len(backups), 1; got != want { 2276 t.Fatalf("Listing backup count: %d, want: %d", got, want) 2277 } 2278 if got, want := backups[0].Name, backupName; got != want { 2279 t.Fatalf("Backup name: %s, want: %s", got, want) 2280 } 2281 if got, want := backups[0].SourceTable, table; got != want { 2282 t.Fatalf("Backup SourceTable: %s, want: %s", got, want) 2283 } 2284 if got, want := backups[0].ExpireTime, backups[0].StartTime.Add(8*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { 2285 t.Fatalf("Backup ExpireTime: %s, want: %s", got, want) 2286 } 2287 2288 // Get backup 2289 backup, err := adminClient.BackupInfo(ctx, cluster, backupName) 2290 if err != nil { 2291 t.Fatalf("BackupInfo: %v", backup) 2292 } 2293 if got, want := *backup, *backups[0]; got != want { 2294 t.Fatalf("BackupInfo: %v, want: %v", got, want) 2295 } 2296 2297 // Update backup 2298 newExpireTime := time.Now().Add(10 * time.Hour) 2299 err = adminClient.UpdateBackup(ctx, cluster, backupName, newExpireTime) 2300 if err != nil { 2301 t.Fatalf("UpdateBackup failed: %v", err) 2302 } 2303 2304 // Check that updated backup has the correct expire time 2305 updatedBackup, err := adminClient.BackupInfo(ctx, cluster, backupName) 2306 if err != nil { 2307 t.Fatalf("BackupInfo: %v", err) 2308 } 2309 backup.ExpireTime = newExpireTime 2310 // Server clock and local clock may not be perfectly sync'ed. 2311 if got, want := *updatedBackup, *backup; got.ExpireTime.Sub(want.ExpireTime) > time.Minute { 2312 t.Fatalf("BackupInfo: %v, want: %v", got, want) 2313 } 2314 2315 // Restore backup 2316 restoredTable := table + "-restored" 2317 defer deleteTable(ctx, t, adminClient, restoredTable) 2318 if err = adminClient.RestoreTable(ctx, restoredTable, cluster, backupName); err != nil { 2319 t.Fatalf("RestoreTable: %v", err) 2320 } 2321 if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { 2322 t.Fatalf("Restored TableInfo: %v", err) 2323 } 2324 2325 // Delete backup 2326 if err = adminClient.DeleteBackup(ctx, cluster, backupName); err != nil { 2327 t.Fatalf("DeleteBackup: %v", err) 2328 } 2329 backups, err = list(cluster) 2330 if err != nil { 2331 t.Fatalf("List after Delete: %v", err) 2332 } 2333 if got, want := len(backups), 0; got != want { 2334 t.Fatalf("List after delete len: %d, want: %d", got, want) 2335 } 2336} 2337 2338// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed. 2339func TestIntegration_DirectPathFallback(t *testing.T) { 2340 ctx := context.Background() 2341 testEnv, _, _, table, _, cleanup, err := setupIntegration(ctx, t) 2342 if err != nil { 2343 t.Fatal(err) 2344 } 2345 defer cleanup() 2346 2347 if !testEnv.Config().AttemptDirectPath { 2348 t.Skip() 2349 } 2350 2351 if len(blackholeDpv6Cmd) == 0 { 2352 t.Fatal("-it.blackhole-dpv6-cmd unset") 2353 } 2354 if len(blackholeDpv4Cmd) == 0 { 2355 t.Fatal("-it.blackhole-dpv4-cmd unset") 2356 } 2357 if len(allowDpv6Cmd) == 0 { 2358 t.Fatal("-it.allowdpv6-cmd unset") 2359 } 2360 if len(allowDpv4Cmd) == 0 { 2361 t.Fatal("-it.allowdpv4-cmd unset") 2362 } 2363 2364 if err := populatePresidentsGraph(table); err != nil { 2365 t.Fatal(err) 2366 } 2367 2368 // Precondition: wait for DirectPath to connect. 2369 dpEnabled := examineTraffic(ctx, testEnv, table, false) 2370 if !dpEnabled { 2371 t.Fatalf("Failed to observe RPCs over DirectPath") 2372 } 2373 2374 // Enable the blackhole, which will prevent communication with grpclb and thus DirectPath. 2375 blackholeDirectPath(testEnv, t) 2376 dpDisabled := examineTraffic(ctx, testEnv, table, true) 2377 if !dpDisabled { 2378 t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") 2379 } 2380 2381 // Disable the blackhole, and client should use DirectPath again. 2382 allowDirectPath(testEnv, t) 2383 dpEnabled = examineTraffic(ctx, testEnv, table, false) 2384 if !dpEnabled { 2385 t.Fatalf("Failed to fallback to CFE after blackhole DirectPath") 2386 } 2387} 2388 2389// examineTraffic returns whether RPCs use DirectPath (blackholeDP = false) or CFE (blackholeDP = true). 2390func examineTraffic(ctx context.Context, testEnv IntegrationEnv, table *Table, blackholeDP bool) bool { 2391 numCount := 0 2392 const ( 2393 numRPCsToSend = 20 2394 minCompleteRPC = 40 2395 ) 2396 2397 start := time.Now() 2398 for time.Since(start) < 2*time.Minute { 2399 for i := 0; i < numRPCsToSend; i++ { 2400 _, _ = table.ReadRow(ctx, "jadams") 2401 if _, useDP := isDirectPathRemoteAddress(testEnv); useDP != blackholeDP { 2402 numCount++ 2403 if numCount >= minCompleteRPC { 2404 return true 2405 } 2406 } 2407 time.Sleep(100 * time.Millisecond) 2408 } 2409 } 2410 return false 2411} 2412 2413func setupIntegration(ctx context.Context, t *testing.T) (_ IntegrationEnv, _ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { 2414 testEnv, err := NewIntegrationEnv() 2415 if err != nil { 2416 return nil, nil, nil, nil, "", nil, err 2417 } 2418 2419 var timeout time.Duration 2420 if testEnv.Config().UseProd { 2421 timeout = 10 * time.Minute 2422 t.Logf("Running test against production") 2423 } else { 2424 timeout = 5 * time.Minute 2425 t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint) 2426 } 2427 ctx, cancel := context.WithTimeout(ctx, timeout) 2428 2429 client, err := testEnv.NewClient() 2430 if err != nil { 2431 return nil, nil, nil, nil, "", nil, err 2432 } 2433 2434 adminClient, err := testEnv.NewAdminClient() 2435 if err != nil { 2436 return nil, nil, nil, nil, "", nil, err 2437 } 2438 2439 if testEnv.Config().UseProd { 2440 // TODO: tables may not be successfully deleted in some cases, and will 2441 // become obsolete. We may need a way to automatically delete them. 2442 tableName = tableNameSpace.New() 2443 } else { 2444 tableName = testEnv.Config().Table 2445 } 2446 2447 if err := adminClient.CreateTable(ctx, tableName); err != nil { 2448 cancel() 2449 return nil, nil, nil, nil, "", nil, err 2450 } 2451 if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil { 2452 cancel() 2453 return nil, nil, nil, nil, "", nil, err 2454 } 2455 2456 return testEnv, client, adminClient, client.Open(tableName), tableName, func() { 2457 if err := adminClient.DeleteTable(ctx, tableName); err != nil { 2458 t.Errorf("DeleteTable got error %v", err) 2459 } 2460 cancel() 2461 client.Close() 2462 adminClient.Close() 2463 }, nil 2464} 2465 2466func formatReadItem(ri ReadItem) string { 2467 // Use the column qualifier only to make the test data briefer. 2468 col := ri.Column[strings.Index(ri.Column, ":")+1:] 2469 return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value) 2470} 2471 2472func fill(b, sub []byte) { 2473 for len(b) > len(sub) { 2474 n := copy(b, sub) 2475 b = b[n:] 2476 } 2477} 2478 2479func clearTimestamps(r Row) { 2480 for _, ris := range r { 2481 for i := range ris { 2482 ris[i].Timestamp = 0 2483 } 2484 } 2485} 2486 2487func deleteTable(ctx context.Context, t *testing.T, ac *AdminClient, name string) { 2488 bo := gax.Backoff{ 2489 Initial: 100 * time.Millisecond, 2490 Max: 2 * time.Second, 2491 Multiplier: 1.2, 2492 } 2493 ctx, _ = context.WithTimeout(ctx, time.Second*30) 2494 2495 err := internal.Retry(ctx, bo, func() (bool, error) { 2496 err := ac.DeleteTable(ctx, name) 2497 if err != nil { 2498 return false, err 2499 } 2500 return true, nil 2501 }) 2502 if err != nil { 2503 t.Logf("DeleteTable: %v", err) 2504 } 2505} 2506 2507func verifyDirectPathRemoteAddress(testEnv IntegrationEnv, t *testing.T) { 2508 t.Helper() 2509 if !testEnv.Config().AttemptDirectPath { 2510 return 2511 } 2512 if remoteIP, res := isDirectPathRemoteAddress(testEnv); !res { 2513 if testEnv.Config().DirectPathIPV4Only { 2514 t.Fatalf("Expect to access DirectPath via ipv4 only, but RPC was destined to %s", remoteIP) 2515 } else { 2516 t.Fatalf("Expect to access DirectPath via ipv4 or ipv6, but RPC was destined to %s", remoteIP) 2517 } 2518 } 2519} 2520 2521func isDirectPathRemoteAddress(testEnv IntegrationEnv) (_ string, _ bool) { 2522 remoteIP := testEnv.Peer().Addr.String() 2523 // DirectPath ipv4-only can only use ipv4 traffic. 2524 if testEnv.Config().DirectPathIPV4Only { 2525 return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) 2526 } 2527 // DirectPath ipv6 can use either ipv4 or ipv6 traffic. 2528 return remoteIP, strings.HasPrefix(remoteIP, directPathIPV4Prefix) || strings.HasPrefix(remoteIP, directPathIPV6Prefix) 2529} 2530 2531func blackholeDirectPath(testEnv IntegrationEnv, t *testing.T) { 2532 cmdRes := exec.Command("bash", "-c", blackholeDpv4Cmd) 2533 out, _ := cmdRes.CombinedOutput() 2534 t.Logf(string(out)) 2535 if testEnv.Config().DirectPathIPV4Only { 2536 return 2537 } 2538 cmdRes = exec.Command("bash", "-c", blackholeDpv6Cmd) 2539 out, _ = cmdRes.CombinedOutput() 2540 t.Logf(string(out)) 2541} 2542 2543func allowDirectPath(testEnv IntegrationEnv, t *testing.T) { 2544 cmdRes := exec.Command("bash", "-c", allowDpv4Cmd) 2545 out, _ := cmdRes.CombinedOutput() 2546 t.Logf(string(out)) 2547 if testEnv.Config().DirectPathIPV4Only { 2548 return 2549 } 2550 cmdRes = exec.Command("bash", "-c", allowDpv6Cmd) 2551 out, _ = cmdRes.CombinedOutput() 2552 t.Logf(string(out)) 2553} 2554