1/* 2Copyright 2019 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "context" 21 "flag" 22 "fmt" 23 "math" 24 "math/rand" 25 "sort" 26 "strings" 27 "sync" 28 "testing" 29 "time" 30 31 "cloud.google.com/go/internal/testutil" 32 "github.com/golang/protobuf/proto" 33 "google.golang.org/api/iterator" 34 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 35) 36 37var presidentsSocialGraph = map[string][]string{ 38 "wmckinley": {"tjefferson"}, 39 "gwashington": {"jadams"}, 40 "tjefferson": {"gwashington", "jadams"}, 41 "jadams": {"gwashington", "tjefferson"}, 42} 43 44func populatePresidentsGraph(table *Table) error { 45 ctx := context.Background() 46 for row, ss := range presidentsSocialGraph { 47 mut := NewMutation() 48 for _, name := range ss { 49 mut.Set("follows", name, 1000, []byte("1")) 50 } 51 if err := table.Apply(ctx, row, mut); err != nil { 52 return fmt.Errorf("Mutating row %q: %v", row, err) 53 } 54 } 55 return nil 56} 57 58var instanceToCreate string 59var instanceToCreateZone string 60 61func init() { 62 // Don't test instance creation by default, as quota is necessary and aborted tests could strand resources. 63 flag.StringVar(&instanceToCreate, "it.instance-to-create", "", 64 "The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.") 65 flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b", 66 "The zone in which to create the new test instance.") 67} 68 69func TestIntegration_ConditionalMutations(t *testing.T) { 70 ctx := context.Background() 71 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 72 if err != nil { 73 t.Fatal(err) 74 } 75 defer cleanup() 76 77 if err := populatePresidentsGraph(table); err != nil { 78 t.Fatal(err) 79 } 80 81 // Do a conditional mutation with a complex filter. 82 mutTrue := NewMutation() 83 mutTrue.Set("follows", "wmckinley", 1000, []byte("1")) 84 filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter(".")) 85 mut := NewCondMutation(filter, mutTrue, nil) 86 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 87 t.Fatalf("Conditionally mutating row: %v", err) 88 } 89 // Do a second condition mutation with a filter that does not match, 90 // and thus no changes should be made. 91 mutTrue = NewMutation() 92 mutTrue.DeleteRow() 93 filter = ColumnFilter("snoop.dogg") 94 mut = NewCondMutation(filter, mutTrue, nil) 95 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 96 t.Fatalf("Conditionally mutating row: %v", err) 97 } 98 99 // Fetch a row. 100 row, err := table.ReadRow(ctx, "jadams") 101 if err != nil { 102 t.Fatalf("Reading a row: %v", err) 103 } 104 wantRow := Row{ 105 "follows": []ReadItem{ 106 {Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")}, 107 {Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")}, 108 }, 109 } 110 if !testutil.Equal(row, wantRow) { 111 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 112 } 113} 114 115func TestIntegration_PartialReadRows(t *testing.T) { 116 ctx := context.Background() 117 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 118 if err != nil { 119 t.Fatal(err) 120 } 121 defer cleanup() 122 123 if err := populatePresidentsGraph(table); err != nil { 124 t.Fatal(err) 125 } 126 127 // Do a scan and stop part way through. 128 // Verify that the ReadRows callback doesn't keep running. 129 stopped := false 130 err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { 131 if r.Key() < "h" { 132 return true 133 } 134 if !stopped { 135 stopped = true 136 return false 137 } 138 t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key()) 139 return false 140 }) 141 if err != nil { 142 t.Fatalf("Partial ReadRows: %v", err) 143 } 144} 145 146func TestIntegration_ReadRowList(t *testing.T) { 147 ctx := context.Background() 148 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 149 if err != nil { 150 t.Fatal(err) 151 } 152 defer cleanup() 153 154 if err := populatePresidentsGraph(table); err != nil { 155 t.Fatal(err) 156 } 157 158 // Read a RowList 159 var elt []string 160 keys := RowList{"wmckinley", "gwashington", "jadams"} 161 want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1" 162 err = table.ReadRows(ctx, keys, func(r Row) bool { 163 for _, ris := range r { 164 for _, ri := range ris { 165 elt = append(elt, formatReadItem(ri)) 166 } 167 } 168 return true 169 }) 170 if err != nil { 171 t.Fatalf("read RowList: %v", err) 172 } 173 174 if got := strings.Join(elt, ","); got != want { 175 t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) 176 } 177} 178 179func TestIntegration_DeleteRow(t *testing.T) { 180 ctx := context.Background() 181 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 182 if err != nil { 183 t.Fatal(err) 184 } 185 defer cleanup() 186 187 if err := populatePresidentsGraph(table); err != nil { 188 t.Fatal(err) 189 } 190 191 // Delete a row and check it goes away. 192 mut := NewMutation() 193 mut.DeleteRow() 194 if err := table.Apply(ctx, "wmckinley", mut); err != nil { 195 t.Fatalf("Apply DeleteRow: %v", err) 196 } 197 row, err := table.ReadRow(ctx, "wmckinley") 198 if err != nil { 199 t.Fatalf("Reading a row after DeleteRow: %v", err) 200 } 201 if len(row) != 0 { 202 t.Fatalf("Read non-zero row after DeleteRow: %v", row) 203 } 204} 205 206func TestIntegration_ReadModifyWrite(t *testing.T) { 207 ctx := context.Background() 208 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 209 if err != nil { 210 t.Fatal(err) 211 } 212 defer cleanup() 213 214 if err := populatePresidentsGraph(table); err != nil { 215 t.Fatal(err) 216 } 217 218 if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil { 219 t.Fatalf("Creating column family: %v", err) 220 } 221 222 appendRMW := func(b []byte) *ReadModifyWrite { 223 rmw := NewReadModifyWrite() 224 rmw.AppendValue("counter", "likes", b) 225 return rmw 226 } 227 incRMW := func(n int64) *ReadModifyWrite { 228 rmw := NewReadModifyWrite() 229 rmw.Increment("counter", "likes", n) 230 return rmw 231 } 232 rmwSeq := []struct { 233 desc string 234 rmw *ReadModifyWrite 235 want []byte 236 }{ 237 { 238 desc: "append #1", 239 rmw: appendRMW([]byte{0, 0, 0}), 240 want: []byte{0, 0, 0}, 241 }, 242 { 243 desc: "append #2", 244 rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17 245 want: []byte{0, 0, 0, 0, 0, 0, 0, 17}, 246 }, 247 { 248 desc: "increment", 249 rmw: incRMW(8), 250 want: []byte{0, 0, 0, 0, 0, 0, 0, 25}, 251 }, 252 } 253 for _, step := range rmwSeq { 254 row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw) 255 if err != nil { 256 t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) 257 } 258 // Make sure the modified cell returned by the RMW operation has a timestamp. 259 if row["counter"][0].Timestamp == 0 { 260 t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp) 261 } 262 clearTimestamps(row) 263 wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}} 264 if !testutil.Equal(row, wantRow) { 265 t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow) 266 } 267 } 268 269 // Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator. 270 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0})) 271 if err != nil { 272 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 273 } 274 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0})) 275 if err != nil { 276 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 277 } 278 // Get only the correct row back on read. 279 r, err := table.ReadRow(ctx, "issue-723-1") 280 if err != nil { 281 t.Fatalf("Reading row: %v", err) 282 } 283 if r.Key() != "issue-723-1" { 284 t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1") 285 } 286} 287 288func TestIntegration_ArbitraryTimestamps(t *testing.T) { 289 ctx := context.Background() 290 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 291 if err != nil { 292 t.Fatal(err) 293 } 294 defer cleanup() 295 296 // Test arbitrary timestamps more thoroughly. 297 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 298 t.Fatalf("Creating column family: %v", err) 299 } 300 const numVersions = 4 301 mut := NewMutation() 302 for i := 1; i < numVersions; i++ { 303 // Timestamps are used in thousands because the server 304 // only permits that granularity. 305 mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 306 mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 307 } 308 if err := table.Apply(ctx, "testrow", mut); err != nil { 309 t.Fatalf("Mutating row: %v", err) 310 } 311 r, err := table.ReadRow(ctx, "testrow") 312 if err != nil { 313 t.Fatalf("Reading row: %v", err) 314 } 315 wantRow := Row{"ts": []ReadItem{ 316 // These should be returned in descending timestamp order. 317 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 318 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 319 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 320 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 321 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 322 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 323 }} 324 if !testutil.Equal(r, wantRow) { 325 t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow) 326 } 327 328 // Do the same read, but filter to the latest two versions. 329 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 330 if err != nil { 331 t.Fatalf("Reading row: %v", err) 332 } 333 wantRow = Row{"ts": []ReadItem{ 334 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 335 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 336 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 337 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 338 }} 339 if !testutil.Equal(r, wantRow) { 340 t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow) 341 } 342 // Check cell offset / limit 343 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3))) 344 if err != nil { 345 t.Fatalf("Reading row: %v", err) 346 } 347 wantRow = Row{"ts": []ReadItem{ 348 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 349 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 350 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 351 }} 352 if !testutil.Equal(r, wantRow) { 353 t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow) 354 } 355 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3))) 356 if err != nil { 357 t.Fatalf("Reading row: %v", err) 358 } 359 wantRow = Row{"ts": []ReadItem{ 360 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 361 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 362 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 363 }} 364 if !testutil.Equal(r, wantRow) { 365 t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow) 366 } 367 // Check timestamp range filtering (with truncation) 368 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000))) 369 if err != nil { 370 t.Fatalf("Reading row: %v", err) 371 } 372 wantRow = Row{"ts": []ReadItem{ 373 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 374 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 375 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 376 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 377 }} 378 if !testutil.Equal(r, wantRow) { 379 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow) 380 } 381 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0))) 382 if err != nil { 383 t.Fatalf("Reading row: %v", err) 384 } 385 wantRow = Row{"ts": []ReadItem{ 386 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 387 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 388 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 389 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 390 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 391 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 392 }} 393 if !testutil.Equal(r, wantRow) { 394 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow) 395 } 396 // Delete non-existing cells, no such column family in this row 397 // Should not delete anything 398 if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil { 399 t.Fatalf("Creating column family: %v", err) 400 } 401 mut = NewMutation() 402 mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval 403 if err := table.Apply(ctx, "testrow", mut); err != nil { 404 t.Fatalf("Mutating row: %v", err) 405 } 406 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 407 if err != nil { 408 t.Fatalf("Reading row: %v", err) 409 } 410 if !testutil.Equal(r, wantRow) { 411 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 412 } 413 // Delete non-existing cells, no such column in this column family 414 // Should not delete anything 415 mut = NewMutation() 416 mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval 417 if err := table.Apply(ctx, "testrow", mut); err != nil { 418 t.Fatalf("Mutating row: %v", err) 419 } 420 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 421 if err != nil { 422 t.Fatalf("Reading row: %v", err) 423 } 424 if !testutil.Equal(r, wantRow) { 425 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 426 } 427 // Delete the cell with timestamp 2000 and repeat the last read, 428 // checking that we get ts 3000 and ts 1000. 429 mut = NewMutation() 430 mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval 431 if err := table.Apply(ctx, "testrow", mut); err != nil { 432 t.Fatalf("Mutating row: %v", err) 433 } 434 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 435 if err != nil { 436 t.Fatalf("Reading row: %v", err) 437 } 438 wantRow = Row{"ts": []ReadItem{ 439 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 440 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 441 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 442 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 443 }} 444 if !testutil.Equal(r, wantRow) { 445 t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow) 446 } 447 448 // Check DeleteCellsInFamily 449 if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil { 450 t.Fatalf("Creating column family: %v", err) 451 } 452 453 mut = NewMutation() 454 mut.Set("status", "start", 2000, []byte("2")) 455 mut.Set("status", "end", 3000, []byte("3")) 456 mut.Set("ts", "col", 1000, []byte("1")) 457 if err := table.Apply(ctx, "row1", mut); err != nil { 458 t.Fatalf("Mutating row: %v", err) 459 } 460 if err := table.Apply(ctx, "row2", mut); err != nil { 461 t.Fatalf("Mutating row: %v", err) 462 } 463 464 mut = NewMutation() 465 mut.DeleteCellsInFamily("status") 466 if err := table.Apply(ctx, "row1", mut); err != nil { 467 t.Fatalf("Delete cf: %v", err) 468 } 469 470 // ColumnFamily removed 471 r, err = table.ReadRow(ctx, "row1") 472 if err != nil { 473 t.Fatalf("Reading row: %v", err) 474 } 475 wantRow = Row{"ts": []ReadItem{ 476 {Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 477 }} 478 if !testutil.Equal(r, wantRow) { 479 t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow) 480 } 481 482 // ColumnFamily not removed 483 r, err = table.ReadRow(ctx, "row2") 484 if err != nil { 485 t.Fatalf("Reading row: %v", err) 486 } 487 wantRow = Row{ 488 "ts": []ReadItem{ 489 {Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 490 }, 491 "status": []ReadItem{ 492 {Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")}, 493 {Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 494 }, 495 } 496 if !testutil.Equal(r, wantRow) { 497 t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow) 498 } 499 500 // Check DeleteCellsInColumn 501 mut = NewMutation() 502 mut.Set("status", "start", 2000, []byte("2")) 503 mut.Set("status", "middle", 3000, []byte("3")) 504 mut.Set("status", "end", 1000, []byte("1")) 505 if err := table.Apply(ctx, "row3", mut); err != nil { 506 t.Fatalf("Mutating row: %v", err) 507 } 508 mut = NewMutation() 509 mut.DeleteCellsInColumn("status", "middle") 510 if err := table.Apply(ctx, "row3", mut); err != nil { 511 t.Fatalf("Delete column: %v", err) 512 } 513 r, err = table.ReadRow(ctx, "row3") 514 if err != nil { 515 t.Fatalf("Reading row: %v", err) 516 } 517 wantRow = Row{ 518 "status": []ReadItem{ 519 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 520 {Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 521 }, 522 } 523 if !testutil.Equal(r, wantRow) { 524 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 525 } 526 mut = NewMutation() 527 mut.DeleteCellsInColumn("status", "start") 528 if err := table.Apply(ctx, "row3", mut); err != nil { 529 t.Fatalf("Delete column: %v", err) 530 } 531 r, err = table.ReadRow(ctx, "row3") 532 if err != nil { 533 t.Fatalf("Reading row: %v", err) 534 } 535 wantRow = Row{ 536 "status": []ReadItem{ 537 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 538 }, 539 } 540 if !testutil.Equal(r, wantRow) { 541 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 542 } 543 mut = NewMutation() 544 mut.DeleteCellsInColumn("status", "end") 545 if err := table.Apply(ctx, "row3", mut); err != nil { 546 t.Fatalf("Delete column: %v", err) 547 } 548 r, err = table.ReadRow(ctx, "row3") 549 if err != nil { 550 t.Fatalf("Reading row: %v", err) 551 } 552 if len(r) != 0 { 553 t.Fatalf("Delete column: got %v, want empty row", r) 554 } 555 // Add same cell after delete 556 mut = NewMutation() 557 mut.Set("status", "end", 1000, []byte("1")) 558 if err := table.Apply(ctx, "row3", mut); err != nil { 559 t.Fatalf("Mutating row: %v", err) 560 } 561 r, err = table.ReadRow(ctx, "row3") 562 if err != nil { 563 t.Fatalf("Reading row: %v", err) 564 } 565 if !testutil.Equal(r, wantRow) { 566 t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow) 567 } 568} 569 570func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { 571 ctx := context.Background() 572 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 573 if err != nil { 574 t.Fatal(err) 575 } 576 defer cleanup() 577 578 if err := populatePresidentsGraph(table); err != nil { 579 t.Fatal(err) 580 } 581 582 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 583 t.Fatalf("Creating column family: %v", err) 584 } 585 586 // Do highly concurrent reads/writes. 587 const maxConcurrency = 1000 588 var wg sync.WaitGroup 589 for i := 0; i < maxConcurrency; i++ { 590 wg.Add(1) 591 go func() { 592 defer wg.Done() 593 switch r := rand.Intn(100); { // r ∈ [0,100) 594 case 0 <= r && r < 30: 595 // Do a read. 596 _, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1))) 597 if err != nil { 598 t.Errorf("Concurrent read: %v", err) 599 } 600 case 30 <= r && r < 100: 601 // Do a write. 602 mut := NewMutation() 603 mut.Set("ts", "col", 1000, []byte("data")) 604 if err := table.Apply(ctx, "testrow", mut); err != nil { 605 t.Errorf("Concurrent write: %v", err) 606 } 607 } 608 }() 609 } 610 wg.Wait() 611} 612 613func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { 614 ctx := context.Background() 615 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 616 if err != nil { 617 t.Fatal(err) 618 } 619 defer cleanup() 620 621 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 622 t.Fatalf("Creating column family: %v", err) 623 } 624 625 bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set. 626 nonsense := []byte("lorem ipsum dolor sit amet, ") 627 fill(bigBytes, nonsense) 628 mut := NewMutation() 629 mut.Set("ts", "col", 1000, bigBytes) 630 if err := table.Apply(ctx, "bigrow", mut); err != nil { 631 t.Fatalf("Big write: %v", err) 632 } 633 r, err := table.ReadRow(ctx, "bigrow") 634 if err != nil { 635 t.Fatalf("Big read: %v", err) 636 } 637 wantRow := Row{"ts": []ReadItem{ 638 {Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes}, 639 }} 640 if !testutil.Equal(r, wantRow) { 641 t.Fatalf("Big read returned incorrect bytes: %v", r) 642 } 643 644 var wg sync.WaitGroup 645 // Now write 1000 rows, each with 82 KB values, then scan them all. 646 medBytes := make([]byte, 82<<10) 647 fill(medBytes, nonsense) 648 sem := make(chan int, 50) // do up to 50 mutations at a time. 649 for i := 0; i < 1000; i++ { 650 mut := NewMutation() 651 mut.Set("ts", "big-scan", 1000, medBytes) 652 row := fmt.Sprintf("row-%d", i) 653 wg.Add(1) 654 go func() { 655 defer wg.Done() 656 defer func() { <-sem }() 657 sem <- 1 658 if err := table.Apply(ctx, row, mut); err != nil { 659 t.Errorf("Preparing large scan: %v", err) 660 } 661 }() 662 } 663 wg.Wait() 664 n := 0 665 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 666 for _, ris := range r { 667 for _, ri := range ris { 668 n += len(ri.Value) 669 } 670 } 671 return true 672 }, RowFilter(ColumnFilter("big-scan"))) 673 if err != nil { 674 t.Fatalf("Doing large scan: %v", err) 675 } 676 if want := 1000 * len(medBytes); n != want { 677 t.Fatalf("Large scan returned %d bytes, want %d", n, want) 678 } 679 // Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption. 680 rc := 0 681 wantRc := 3 682 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 683 rc++ 684 return true 685 }, LimitRows(int64(wantRc))) 686 if err != nil { 687 t.Fatal(err) 688 } 689 if rc != wantRc { 690 t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) 691 } 692 693 // Test bulk mutations 694 if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil { 695 t.Fatalf("Creating column family: %v", err) 696 } 697 bulkData := map[string][]string{ 698 "red sox": {"2004", "2007", "2013"}, 699 "patriots": {"2001", "2003", "2004", "2014"}, 700 "celtics": {"1981", "1984", "1986", "2008"}, 701 } 702 var rowKeys []string 703 var muts []*Mutation 704 for row, ss := range bulkData { 705 mut := NewMutation() 706 for _, name := range ss { 707 mut.Set("bulk", name, 1000, []byte("1")) 708 } 709 rowKeys = append(rowKeys, row) 710 muts = append(muts, mut) 711 } 712 status, err := table.ApplyBulk(ctx, rowKeys, muts) 713 if err != nil { 714 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 715 } 716 if status != nil { 717 t.Fatalf("non-nil errors: %v", err) 718 } 719 720 // Read each row back 721 for rowKey, ss := range bulkData { 722 row, err := table.ReadRow(ctx, rowKey) 723 if err != nil { 724 t.Fatalf("Reading a bulk row: %v", err) 725 } 726 var wantItems []ReadItem 727 for _, val := range ss { 728 wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")}) 729 } 730 wantRow := Row{"bulk": wantItems} 731 if !testutil.Equal(row, wantRow) { 732 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 733 } 734 } 735 736 // Test bulk write errors. 737 // Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error. 738 badMut := NewMutation() 739 badMut.Set("badfamily", "col", ServerTime, nil) 740 badMut2 := NewMutation() 741 badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1")) 742 status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2}) 743 if err != nil { 744 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 745 } 746 if status == nil { 747 t.Fatalf("No errors for bad bulk mutation") 748 } else if status[0] == nil || status[1] == nil { 749 t.Fatalf("No error for bad bulk mutation") 750 } 751} 752 753func TestIntegration_Read(t *testing.T) { 754 ctx := context.Background() 755 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 756 if err != nil { 757 t.Fatal(err) 758 } 759 defer cleanup() 760 761 // Insert some data. 762 initialData := map[string][]string{ 763 "wmckinley": {"tjefferson"}, 764 "gwashington": {"jadams"}, 765 "tjefferson": {"gwashington", "jadams", "wmckinley"}, 766 "jadams": {"gwashington", "tjefferson"}, 767 } 768 for row, ss := range initialData { 769 mut := NewMutation() 770 for _, name := range ss { 771 mut.Set("follows", name, 1000, []byte("1")) 772 } 773 if err := table.Apply(ctx, row, mut); err != nil { 774 t.Fatalf("Mutating row %q: %v", row, err) 775 } 776 } 777 778 for _, test := range []struct { 779 desc string 780 rr RowSet 781 filter Filter // may be nil 782 limit ReadOption // may be nil 783 784 // We do the read, grab all the cells, turn them into "<row>-<col>-<val>", 785 // and join with a comma. 786 want string 787 }{ 788 { 789 desc: "read all, unfiltered", 790 rr: RowRange{}, 791 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 792 }, 793 { 794 desc: "read with InfiniteRange, unfiltered", 795 rr: InfiniteRange("tjefferson"), 796 want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 797 }, 798 { 799 desc: "read with NewRange, unfiltered", 800 rr: NewRange("gargamel", "hubbard"), 801 want: "gwashington-jadams-1", 802 }, 803 { 804 desc: "read with PrefixRange, unfiltered", 805 rr: PrefixRange("jad"), 806 want: "jadams-gwashington-1,jadams-tjefferson-1", 807 }, 808 { 809 desc: "read with SingleRow, unfiltered", 810 rr: SingleRow("wmckinley"), 811 want: "wmckinley-tjefferson-1", 812 }, 813 { 814 desc: "read all, with ColumnFilter", 815 rr: RowRange{}, 816 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 817 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 818 }, 819 { 820 desc: "read all, with ColumnFilter, prefix", 821 rr: RowRange{}, 822 filter: ColumnFilter("j"), // no matches 823 want: "", 824 }, 825 { 826 desc: "read range, with ColumnRangeFilter", 827 rr: RowRange{}, 828 filter: ColumnRangeFilter("follows", "h", "k"), 829 want: "gwashington-jadams-1,tjefferson-jadams-1", 830 }, 831 { 832 desc: "read range from empty, with ColumnRangeFilter", 833 rr: RowRange{}, 834 filter: ColumnRangeFilter("follows", "", "u"), 835 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 836 }, 837 { 838 desc: "read range from start to empty, with ColumnRangeFilter", 839 rr: RowRange{}, 840 filter: ColumnRangeFilter("follows", "h", ""), 841 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 842 }, 843 { 844 desc: "read with RowKeyFilter", 845 rr: RowRange{}, 846 filter: RowKeyFilter(".*wash.*"), 847 want: "gwashington-jadams-1", 848 }, 849 { 850 desc: "read with RowKeyFilter, prefix", 851 rr: RowRange{}, 852 filter: RowKeyFilter("gwash"), 853 want: "", 854 }, 855 { 856 desc: "read with RowKeyFilter, no matches", 857 rr: RowRange{}, 858 filter: RowKeyFilter(".*xxx.*"), 859 want: "", 860 }, 861 { 862 desc: "read with FamilyFilter, no matches", 863 rr: RowRange{}, 864 filter: FamilyFilter(".*xxx.*"), 865 want: "", 866 }, 867 { 868 desc: "read with ColumnFilter + row limit", 869 rr: RowRange{}, 870 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 871 limit: LimitRows(2), 872 want: "gwashington-jadams-1,jadams-tjefferson-1", 873 }, 874 { 875 desc: "read all, strip values", 876 rr: RowRange{}, 877 filter: StripValueFilter(), 878 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 879 }, 880 { 881 desc: "read with ColumnFilter + row limit + strip values", 882 rr: RowRange{}, 883 filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson" 884 limit: LimitRows(2), 885 want: "gwashington-jadams-,jadams-tjefferson-", 886 }, 887 { 888 desc: "read with condition, strip values on true", 889 rr: RowRange{}, 890 filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil), 891 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 892 }, 893 { 894 desc: "read with condition, strip values on false", 895 rr: RowRange{}, 896 filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()), 897 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 898 }, 899 { 900 desc: "read with ValueRangeFilter + row limit", 901 rr: RowRange{}, 902 filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1" 903 limit: LimitRows(2), 904 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1", 905 }, 906 { 907 desc: "read with ValueRangeFilter, no match on exclusive end", 908 rr: RowRange{}, 909 filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match 910 want: "", 911 }, 912 { 913 desc: "read with ValueRangeFilter, no matches", 914 rr: RowRange{}, 915 filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing 916 want: "", 917 }, 918 { 919 desc: "read with InterleaveFilter, no matches on all filters", 920 rr: RowRange{}, 921 filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")), 922 want: "", 923 }, 924 { 925 desc: "read with InterleaveFilter, no duplicate cells", 926 rr: RowRange{}, 927 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")), 928 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 929 }, 930 { 931 desc: "read with InterleaveFilter, with duplicate cells", 932 rr: RowRange{}, 933 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")), 934 want: "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1", 935 }, 936 { 937 desc: "read with a RowRangeList and no filter", 938 rr: RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")}, 939 want: "gwashington-jadams-1,wmckinley-tjefferson-1", 940 }, 941 { 942 desc: "chain that excludes rows and matches nothing, in a condition", 943 rr: RowRange{}, 944 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil), 945 want: "", 946 }, 947 { 948 desc: "chain that ends with an interleave that has no match. covers #804", 949 rr: RowRange{}, 950 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil), 951 want: "", 952 }, 953 } { 954 t.Run(test.desc, func(t *testing.T) { 955 var opts []ReadOption 956 if test.filter != nil { 957 opts = append(opts, RowFilter(test.filter)) 958 } 959 if test.limit != nil { 960 opts = append(opts, test.limit) 961 } 962 var elt []string 963 err := table.ReadRows(ctx, test.rr, func(r Row) bool { 964 for _, ris := range r { 965 for _, ri := range ris { 966 elt = append(elt, formatReadItem(ri)) 967 } 968 } 969 return true 970 }, opts...) 971 if err != nil { 972 t.Fatal(err) 973 } 974 if got := strings.Join(elt, ","); got != test.want { 975 t.Fatalf("got %q\nwant %q", got, test.want) 976 } 977 }) 978 } 979} 980 981func TestIntegration_SampleRowKeys(t *testing.T) { 982 ctx := context.Background() 983 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 984 if err != nil { 985 t.Fatal(err) 986 } 987 defer cleanup() 988 989 // Insert some data. 990 initialData := map[string][]string{ 991 "wmckinley11": {"tjefferson11"}, 992 "gwashington77": {"jadams77"}, 993 "tjefferson0": {"gwashington0", "jadams0"}, 994 } 995 996 for row, ss := range initialData { 997 mut := NewMutation() 998 for _, name := range ss { 999 mut.Set("follows", name, 1000, []byte("1")) 1000 } 1001 if err := table.Apply(ctx, row, mut); err != nil { 1002 t.Fatalf("Mutating row %q: %v", row, err) 1003 } 1004 } 1005 sampleKeys, err := table.SampleRowKeys(context.Background()) 1006 if err != nil { 1007 t.Fatalf("%s: %v", "SampleRowKeys:", err) 1008 } 1009 if len(sampleKeys) == 0 { 1010 t.Error("SampleRowKeys length 0") 1011 } 1012} 1013 1014func TestIntegration_Admin(t *testing.T) { 1015 testEnv, err := NewIntegrationEnv() 1016 if err != nil { 1017 t.Fatalf("IntegrationEnv: %v", err) 1018 } 1019 defer testEnv.Close() 1020 1021 timeout := 2 * time.Second 1022 if testEnv.Config().UseProd { 1023 timeout = 5 * time.Minute 1024 } 1025 ctx, _ := context.WithTimeout(context.Background(), timeout) 1026 1027 adminClient, err := testEnv.NewAdminClient() 1028 if err != nil { 1029 t.Fatalf("NewAdminClient: %v", err) 1030 } 1031 defer adminClient.Close() 1032 1033 iAdminClient, err := testEnv.NewInstanceAdminClient() 1034 if err != nil { 1035 t.Fatalf("NewInstanceAdminClient: %v", err) 1036 } 1037 if iAdminClient != nil { 1038 defer iAdminClient.Close() 1039 1040 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1041 if err != nil { 1042 t.Errorf("InstanceInfo: %v", err) 1043 } 1044 if iInfo.Name != adminClient.instance { 1045 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1046 } 1047 } 1048 1049 list := func() []string { 1050 tbls, err := adminClient.Tables(ctx) 1051 if err != nil { 1052 t.Fatalf("Fetching list of tables: %v", err) 1053 } 1054 sort.Strings(tbls) 1055 return tbls 1056 } 1057 containsAll := func(got, want []string) bool { 1058 gotSet := make(map[string]bool) 1059 1060 for _, s := range got { 1061 gotSet[s] = true 1062 } 1063 for _, s := range want { 1064 if !gotSet[s] { 1065 return false 1066 } 1067 } 1068 return true 1069 } 1070 1071 defer adminClient.DeleteTable(ctx, "mytable") 1072 1073 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1074 t.Fatalf("Creating table: %v", err) 1075 } 1076 1077 defer adminClient.DeleteTable(ctx, "myothertable") 1078 1079 if err := adminClient.CreateTable(ctx, "myothertable"); err != nil { 1080 t.Fatalf("Creating table: %v", err) 1081 } 1082 1083 if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) { 1084 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1085 } 1086 1087 must(adminClient.WaitForReplication(ctx, "mytable")) 1088 1089 if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil { 1090 t.Fatalf("Deleting table: %v", err) 1091 } 1092 tables := list() 1093 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1094 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1095 } 1096 if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) { 1097 t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted) 1098 } 1099 1100 tblConf := TableConf{ 1101 TableID: "conftable", 1102 Families: map[string]GCPolicy{ 1103 "fam1": MaxVersionsPolicy(1), 1104 "fam2": MaxVersionsPolicy(2), 1105 }, 1106 } 1107 if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil { 1108 t.Fatalf("Creating table from TableConf: %v", err) 1109 } 1110 defer adminClient.DeleteTable(ctx, tblConf.TableID) 1111 1112 tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID) 1113 if err != nil { 1114 t.Fatalf("Getting table info: %v", err) 1115 } 1116 sort.Strings(tblInfo.Families) 1117 wantFams := []string{"fam1", "fam2"} 1118 if !testutil.Equal(tblInfo.Families, wantFams) { 1119 t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams) 1120 } 1121 1122 // Populate mytable and drop row ranges 1123 if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil { 1124 t.Fatalf("Creating column family: %v", err) 1125 } 1126 1127 client, err := testEnv.NewClient() 1128 if err != nil { 1129 t.Fatalf("NewClient: %v", err) 1130 } 1131 defer client.Close() 1132 1133 tbl := client.Open("mytable") 1134 1135 prefixes := []string{"a", "b", "c"} 1136 for _, prefix := range prefixes { 1137 for i := 0; i < 5; i++ { 1138 mut := NewMutation() 1139 mut.Set("cf", "col", 1000, []byte("1")) 1140 if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil { 1141 t.Fatalf("Mutating row: %v", err) 1142 } 1143 } 1144 } 1145 1146 if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil { 1147 t.Errorf("DropRowRange a: %v", err) 1148 } 1149 if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil { 1150 t.Errorf("DropRowRange c: %v", err) 1151 } 1152 if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil { 1153 t.Errorf("DropRowRange x: %v", err) 1154 } 1155 1156 var gotRowCount int 1157 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1158 gotRowCount++ 1159 if !strings.HasPrefix(row.Key(), "b") { 1160 t.Errorf("Invalid row after dropping range: %v", row) 1161 } 1162 return true 1163 })) 1164 if gotRowCount != 5 { 1165 t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5) 1166 } 1167} 1168 1169func TestIntegration_AdminCreateInstance(t *testing.T) { 1170 if instanceToCreate == "" { 1171 t.Skip("instanceToCreate not set, skipping instance creation testing") 1172 } 1173 1174 testEnv, err := NewIntegrationEnv() 1175 if err != nil { 1176 t.Fatalf("IntegrationEnv: %v", err) 1177 } 1178 defer testEnv.Close() 1179 1180 if !testEnv.Config().UseProd { 1181 t.Skip("emulator doesn't support instance creation") 1182 } 1183 1184 timeout := 5 * time.Minute 1185 ctx, _ := context.WithTimeout(context.Background(), timeout) 1186 1187 iAdminClient, err := testEnv.NewInstanceAdminClient() 1188 if err != nil { 1189 t.Fatalf("NewInstanceAdminClient: %v", err) 1190 } 1191 defer iAdminClient.Close() 1192 1193 clusterID := instanceToCreate + "-cluster" 1194 1195 // Create a development instance 1196 conf := &InstanceConf{ 1197 InstanceId: instanceToCreate, 1198 ClusterId: clusterID, 1199 DisplayName: "test instance", 1200 Zone: instanceToCreateZone, 1201 InstanceType: DEVELOPMENT, 1202 } 1203 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1204 t.Fatalf("CreateInstance: %v", err) 1205 } 1206 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1207 1208 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1209 if err != nil { 1210 t.Fatalf("InstanceInfo: %v", err) 1211 } 1212 1213 // Basic return values are tested elsewhere, check instance type 1214 if iInfo.InstanceType != DEVELOPMENT { 1215 t.Fatalf("Instance is not DEVELOPMENT: %v", err) 1216 } 1217 1218 // Update everything we can about the instance in one call. 1219 confWithClusters := &InstanceWithClustersConfig{ 1220 InstanceID: instanceToCreate, 1221 DisplayName: "new display name", 1222 InstanceType: PRODUCTION, 1223 Clusters: []ClusterConfig{ 1224 {ClusterID: clusterID, NumNodes: 5, StorageType: HDD}}, 1225 } 1226 1227 if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { 1228 t.Fatalf("UpdateInstanceWithClusters: %v", err) 1229 } 1230 1231 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1232 if err != nil { 1233 t.Fatalf("InstanceInfo: %v", err) 1234 } 1235 1236 if iInfo.InstanceType != PRODUCTION { 1237 t.Fatalf("Instance type is not PRODUCTION: %v", err) 1238 } 1239 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1240 t.Fatalf("Display name: %q, want: %q", got, want) 1241 } 1242 1243 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1244 if err != nil { 1245 t.Fatalf("GetCluster: %v", err) 1246 } 1247 1248 if cInfo.ServeNodes != 5 { 1249 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1250 } 1251 1252 if cInfo.StorageType != HDD { 1253 t.Fatalf("StorageType: %v, want: %v", cInfo.StorageType, HDD) 1254 } 1255} 1256 1257func TestIntegration_AdminSnapshot(t *testing.T) { 1258 testEnv, err := NewIntegrationEnv() 1259 if err != nil { 1260 t.Fatalf("IntegrationEnv: %v", err) 1261 } 1262 defer testEnv.Close() 1263 1264 if !testEnv.Config().UseProd { 1265 t.Skip("emulator doesn't support snapshots") 1266 } 1267 1268 timeout := 2 * time.Second 1269 if testEnv.Config().UseProd { 1270 timeout = 5 * time.Minute 1271 } 1272 ctx, _ := context.WithTimeout(context.Background(), timeout) 1273 1274 adminClient, err := testEnv.NewAdminClient() 1275 if err != nil { 1276 t.Fatalf("NewAdminClient: %v", err) 1277 } 1278 defer adminClient.Close() 1279 1280 table := testEnv.Config().Table 1281 cluster := testEnv.Config().Cluster 1282 1283 list := func(cluster string) ([]*SnapshotInfo, error) { 1284 infos := []*SnapshotInfo(nil) 1285 1286 it := adminClient.Snapshots(ctx, cluster) 1287 for { 1288 s, err := it.Next() 1289 if err == iterator.Done { 1290 break 1291 } 1292 if err != nil { 1293 return nil, err 1294 } 1295 infos = append(infos, s) 1296 } 1297 return infos, err 1298 } 1299 1300 // Delete the table at the end of the test. Schedule ahead of time 1301 // in case the client fails 1302 defer adminClient.DeleteTable(ctx, table) 1303 1304 if err := adminClient.CreateTable(ctx, table); err != nil { 1305 t.Fatalf("Creating table: %v", err) 1306 } 1307 1308 // Precondition: no snapshots 1309 snapshots, err := list(cluster) 1310 if err != nil { 1311 t.Fatalf("Initial snapshot list: %v", err) 1312 } 1313 if got, want := len(snapshots), 0; got != want { 1314 t.Fatalf("Initial snapshot list len: %d, want: %d", got, want) 1315 } 1316 1317 // Create snapshot 1318 defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot") 1319 1320 if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil { 1321 t.Fatalf("Creating snaphot: %v", err) 1322 } 1323 1324 // List snapshot 1325 snapshots, err = list(cluster) 1326 if err != nil { 1327 t.Fatalf("Listing snapshots: %v", err) 1328 } 1329 if got, want := len(snapshots), 1; got != want { 1330 t.Fatalf("Listing snapshot count: %d, want: %d", got, want) 1331 } 1332 if got, want := snapshots[0].Name, "mysnapshot"; got != want { 1333 t.Fatalf("Snapshot name: %s, want: %s", got, want) 1334 } 1335 if got, want := snapshots[0].SourceTable, table; got != want { 1336 t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want) 1337 } 1338 if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { 1339 t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want) 1340 } 1341 1342 // Get snapshot 1343 snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot") 1344 if err != nil { 1345 t.Fatalf("SnapshotInfo: %v", snapshot) 1346 } 1347 if got, want := *snapshot, *snapshots[0]; got != want { 1348 t.Fatalf("SnapshotInfo: %v, want: %v", got, want) 1349 } 1350 1351 // Restore 1352 restoredTable := table + "-restored" 1353 defer adminClient.DeleteTable(ctx, restoredTable) 1354 if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil { 1355 t.Fatalf("CreateTableFromSnapshot: %v", err) 1356 } 1357 if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { 1358 t.Fatalf("Restored TableInfo: %v", err) 1359 } 1360 1361 // Delete snapshot 1362 if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil { 1363 t.Fatalf("DeleteSnapshot: %v", err) 1364 } 1365 snapshots, err = list(cluster) 1366 if err != nil { 1367 t.Fatalf("List after Delete: %v", err) 1368 } 1369 if got, want := len(snapshots), 0; got != want { 1370 t.Fatalf("List after delete len: %d, want: %d", got, want) 1371 } 1372} 1373 1374func TestIntegration_Granularity(t *testing.T) { 1375 testEnv, err := NewIntegrationEnv() 1376 if err != nil { 1377 t.Fatalf("IntegrationEnv: %v", err) 1378 } 1379 defer testEnv.Close() 1380 1381 timeout := 2 * time.Second 1382 if testEnv.Config().UseProd { 1383 timeout = 5 * time.Minute 1384 } 1385 ctx, _ := context.WithTimeout(context.Background(), timeout) 1386 ctx = mergeOutgoingMetadata(ctx, withGoogleClientInfo()) 1387 1388 adminClient, err := testEnv.NewAdminClient() 1389 if err != nil { 1390 t.Fatalf("NewAdminClient: %v", err) 1391 } 1392 defer adminClient.Close() 1393 1394 list := func() []string { 1395 tbls, err := adminClient.Tables(ctx) 1396 if err != nil { 1397 t.Fatalf("Fetching list of tables: %v", err) 1398 } 1399 sort.Strings(tbls) 1400 return tbls 1401 } 1402 containsAll := func(got, want []string) bool { 1403 gotSet := make(map[string]bool) 1404 1405 for _, s := range got { 1406 gotSet[s] = true 1407 } 1408 for _, s := range want { 1409 if !gotSet[s] { 1410 return false 1411 } 1412 } 1413 return true 1414 } 1415 1416 defer adminClient.DeleteTable(ctx, "mytable") 1417 1418 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1419 t.Fatalf("Creating table: %v", err) 1420 } 1421 1422 tables := list() 1423 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1424 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1425 } 1426 1427 // calling ModifyColumnFamilies to check the granularity of table 1428 prefix := adminClient.instancePrefix() 1429 req := &btapb.ModifyColumnFamiliesRequest{ 1430 Name: prefix + "/tables/" + "mytable", 1431 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 1432 Id: "cf", 1433 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}}, 1434 }}, 1435 } 1436 table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req) 1437 if err != nil { 1438 t.Fatalf("Creating column family: %v", err) 1439 } 1440 if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) { 1441 t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS)) 1442 } 1443} 1444 1445func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) { 1446 testEnv, err := NewIntegrationEnv() 1447 if err != nil { 1448 t.Fatalf("IntegrationEnv: %v", err) 1449 } 1450 defer testEnv.Close() 1451 1452 timeout := 2 * time.Second 1453 if testEnv.Config().UseProd { 1454 timeout = 5 * time.Minute 1455 } 1456 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1457 defer cancel() 1458 1459 adminClient, err := testEnv.NewAdminClient() 1460 if err != nil { 1461 t.Fatalf("NewAdminClient: %v", err) 1462 } 1463 defer adminClient.Close() 1464 1465 iAdminClient, err := testEnv.NewInstanceAdminClient() 1466 if err != nil { 1467 t.Fatalf("NewInstanceAdminClient: %v", err) 1468 } 1469 1470 if iAdminClient == nil { 1471 return 1472 } 1473 1474 defer iAdminClient.Close() 1475 profile := ProfileConf{ 1476 ProfileID: "app_profile1", 1477 InstanceID: adminClient.instance, 1478 ClusterID: testEnv.Config().Cluster, 1479 Description: "creating new app profile 1", 1480 RoutingPolicy: SingleClusterRouting, 1481 } 1482 1483 createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile) 1484 if err != nil { 1485 t.Fatalf("Creating app profile: %v", err) 1486 1487 } 1488 1489 gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 1490 1491 if err != nil { 1492 t.Fatalf("Get app profile: %v", err) 1493 } 1494 1495 if !proto.Equal(createdProfile, gotProfile) { 1496 t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name) 1497 1498 } 1499 1500 list := func(instanceID string) ([]*btapb.AppProfile, error) { 1501 profiles := []*btapb.AppProfile(nil) 1502 1503 it := iAdminClient.ListAppProfiles(ctx, instanceID) 1504 for { 1505 s, err := it.Next() 1506 if err == iterator.Done { 1507 break 1508 } 1509 if err != nil { 1510 return nil, err 1511 } 1512 profiles = append(profiles, s) 1513 } 1514 return profiles, err 1515 } 1516 1517 profiles, err := list(adminClient.instance) 1518 if err != nil { 1519 t.Fatalf("List app profile: %v", err) 1520 } 1521 1522 if got, want := len(profiles), 1; got != want { 1523 t.Fatalf("Initial app profile list len: %d, want: %d", got, want) 1524 } 1525 1526 for _, test := range []struct { 1527 desc string 1528 uattrs ProfileAttrsToUpdate 1529 want *btapb.AppProfile // nil means error 1530 }{ 1531 { 1532 desc: "empty update", 1533 uattrs: ProfileAttrsToUpdate{}, 1534 want: nil, 1535 }, 1536 1537 { 1538 desc: "empty description update", 1539 uattrs: ProfileAttrsToUpdate{Description: ""}, 1540 want: &btapb.AppProfile{ 1541 Name: gotProfile.Name, 1542 Description: "", 1543 RoutingPolicy: gotProfile.RoutingPolicy, 1544 Etag: gotProfile.Etag}, 1545 }, 1546 { 1547 desc: "routing update", 1548 uattrs: ProfileAttrsToUpdate{ 1549 RoutingPolicy: SingleClusterRouting, 1550 ClusterID: testEnv.Config().Cluster, 1551 }, 1552 want: &btapb.AppProfile{ 1553 Name: gotProfile.Name, 1554 Description: "", 1555 Etag: gotProfile.Etag, 1556 RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{ 1557 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1558 ClusterId: testEnv.Config().Cluster, 1559 }}, 1560 }, 1561 }, 1562 } { 1563 err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs) 1564 if err != nil { 1565 if test.want != nil { 1566 t.Errorf("%s: %v", test.desc, err) 1567 } 1568 continue 1569 } 1570 if err == nil && test.want == nil { 1571 t.Errorf("%s: got nil, want error", test.desc) 1572 continue 1573 } 1574 1575 got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 1576 1577 if !proto.Equal(got, test.want) { 1578 t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want) 1579 } 1580 1581 } 1582 1583 err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1") 1584 if err != nil { 1585 t.Fatalf("Delete app profile: %v", err) 1586 } 1587 1588} 1589 1590func TestIntegration_InstanceUpdate(t *testing.T) { 1591 testEnv, err := NewIntegrationEnv() 1592 if err != nil { 1593 t.Fatalf("IntegrationEnv: %v", err) 1594 } 1595 defer testEnv.Close() 1596 1597 timeout := 2 * time.Second 1598 if testEnv.Config().UseProd { 1599 timeout = 5 * time.Minute 1600 } 1601 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1602 defer cancel() 1603 1604 adminClient, err := testEnv.NewAdminClient() 1605 if err != nil { 1606 t.Fatalf("NewAdminClient: %v", err) 1607 } 1608 1609 defer adminClient.Close() 1610 1611 iAdminClient, err := testEnv.NewInstanceAdminClient() 1612 if err != nil { 1613 t.Fatalf("NewInstanceAdminClient: %v", err) 1614 } 1615 1616 if iAdminClient == nil { 1617 return 1618 } 1619 1620 defer iAdminClient.Close() 1621 1622 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1623 if err != nil { 1624 t.Errorf("InstanceInfo: %v", err) 1625 } 1626 1627 if iInfo.Name != adminClient.instance { 1628 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1629 } 1630 1631 if iInfo.DisplayName != adminClient.instance { 1632 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1633 } 1634 1635 const numNodes = 4 1636 // update cluster nodes 1637 if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil { 1638 t.Errorf("UpdateCluster: %v", err) 1639 } 1640 1641 // get cluster after updating 1642 cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster) 1643 if err != nil { 1644 t.Errorf("GetCluster %v", err) 1645 } 1646 if cis.ServeNodes != int(numNodes) { 1647 t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes)) 1648 } 1649} 1650 1651func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { 1652 testEnv, err := NewIntegrationEnv() 1653 if err != nil { 1654 return nil, nil, nil, "", nil, err 1655 } 1656 1657 var timeout time.Duration 1658 if testEnv.Config().UseProd { 1659 timeout = 10 * time.Minute 1660 t.Logf("Running test against production") 1661 } else { 1662 timeout = 1 * time.Minute 1663 t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint) 1664 } 1665 ctx, cancel := context.WithTimeout(ctx, timeout) 1666 defer cancel() 1667 1668 client, err := testEnv.NewClient() 1669 if err != nil { 1670 return nil, nil, nil, "", nil, err 1671 } 1672 1673 adminClient, err := testEnv.NewAdminClient() 1674 if err != nil { 1675 return nil, nil, nil, "", nil, err 1676 } 1677 1678 tableName = testEnv.Config().Table 1679 if err := adminClient.CreateTable(ctx, tableName); err != nil { 1680 return nil, nil, nil, "", nil, err 1681 } 1682 if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil { 1683 return nil, nil, nil, "", nil, err 1684 } 1685 1686 return client, adminClient, client.Open(tableName), tableName, func() { 1687 adminClient.DeleteTable(ctx, tableName) 1688 client.Close() 1689 adminClient.Close() 1690 }, nil 1691} 1692 1693func formatReadItem(ri ReadItem) string { 1694 // Use the column qualifier only to make the test data briefer. 1695 col := ri.Column[strings.Index(ri.Column, ":")+1:] 1696 return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value) 1697} 1698 1699func fill(b, sub []byte) { 1700 for len(b) > len(sub) { 1701 n := copy(b, sub) 1702 b = b[n:] 1703 } 1704} 1705 1706func clearTimestamps(r Row) { 1707 for _, ris := range r { 1708 for i := range ris { 1709 ris[i].Timestamp = 0 1710 } 1711 } 1712} 1713