1/* 2Copyright 2019 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "context" 21 "flag" 22 "fmt" 23 "math" 24 "math/rand" 25 "sort" 26 "strings" 27 "sync" 28 "testing" 29 "time" 30 31 "cloud.google.com/go/internal/testutil" 32 "github.com/golang/protobuf/proto" 33 "google.golang.org/api/iterator" 34 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 35) 36 37var presidentsSocialGraph = map[string][]string{ 38 "wmckinley": {"tjefferson"}, 39 "gwashington": {"jadams"}, 40 "tjefferson": {"gwashington", "jadams"}, 41 "jadams": {"gwashington", "tjefferson"}, 42} 43 44func populatePresidentsGraph(table *Table) error { 45 ctx := context.Background() 46 for row, ss := range presidentsSocialGraph { 47 mut := NewMutation() 48 for _, name := range ss { 49 mut.Set("follows", name, 1000, []byte("1")) 50 } 51 if err := table.Apply(ctx, row, mut); err != nil { 52 return fmt.Errorf("Mutating row %q: %v", row, err) 53 } 54 } 55 return nil 56} 57 58var instanceToCreate string 59var instanceToCreateZone string 60 61func init() { 62 // Don't test instance creation by default, as quota is necessary and aborted tests could strand resources. 63 flag.StringVar(&instanceToCreate, "it.instance-to-create", "", 64 "The id of an instance to create, update and delete. Requires sufficient Cloud Bigtable quota. Requires that it.use-prod is true.") 65 flag.StringVar(&instanceToCreateZone, "it.instance-to-create-zone", "us-central1-b", 66 "The zone in which to create the new test instance.") 67} 68 69func TestIntegration_ConditionalMutations(t *testing.T) { 70 ctx := context.Background() 71 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 72 if err != nil { 73 t.Fatal(err) 74 } 75 defer cleanup() 76 77 if err := populatePresidentsGraph(table); err != nil { 78 t.Fatal(err) 79 } 80 81 // Do a conditional mutation with a complex filter. 82 mutTrue := NewMutation() 83 mutTrue.Set("follows", "wmckinley", 1000, []byte("1")) 84 filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter(".")) 85 mut := NewCondMutation(filter, mutTrue, nil) 86 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 87 t.Fatalf("Conditionally mutating row: %v", err) 88 } 89 // Do a second condition mutation with a filter that does not match, 90 // and thus no changes should be made. 91 mutTrue = NewMutation() 92 mutTrue.DeleteRow() 93 filter = ColumnFilter("snoop.dogg") 94 mut = NewCondMutation(filter, mutTrue, nil) 95 if err := table.Apply(ctx, "tjefferson", mut); err != nil { 96 t.Fatalf("Conditionally mutating row: %v", err) 97 } 98 99 // Fetch a row. 100 row, err := table.ReadRow(ctx, "jadams") 101 if err != nil { 102 t.Fatalf("Reading a row: %v", err) 103 } 104 wantRow := Row{ 105 "follows": []ReadItem{ 106 {Row: "jadams", Column: "follows:gwashington", Timestamp: 1000, Value: []byte("1")}, 107 {Row: "jadams", Column: "follows:tjefferson", Timestamp: 1000, Value: []byte("1")}, 108 }, 109 } 110 if !testutil.Equal(row, wantRow) { 111 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 112 } 113} 114 115func TestIntegration_PartialReadRows(t *testing.T) { 116 ctx := context.Background() 117 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 118 if err != nil { 119 t.Fatal(err) 120 } 121 defer cleanup() 122 123 if err := populatePresidentsGraph(table); err != nil { 124 t.Fatal(err) 125 } 126 127 // Do a scan and stop part way through. 128 // Verify that the ReadRows callback doesn't keep running. 129 stopped := false 130 err = table.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { 131 if r.Key() < "h" { 132 return true 133 } 134 if !stopped { 135 stopped = true 136 return false 137 } 138 t.Fatalf("ReadRows kept scanning to row %q after being told to stop", r.Key()) 139 return false 140 }) 141 if err != nil { 142 t.Fatalf("Partial ReadRows: %v", err) 143 } 144} 145 146func TestIntegration_ReadRowList(t *testing.T) { 147 ctx := context.Background() 148 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 149 if err != nil { 150 t.Fatal(err) 151 } 152 defer cleanup() 153 154 if err := populatePresidentsGraph(table); err != nil { 155 t.Fatal(err) 156 } 157 158 // Read a RowList 159 var elt []string 160 keys := RowList{"wmckinley", "gwashington", "jadams"} 161 want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1" 162 err = table.ReadRows(ctx, keys, func(r Row) bool { 163 for _, ris := range r { 164 for _, ri := range ris { 165 elt = append(elt, formatReadItem(ri)) 166 } 167 } 168 return true 169 }) 170 if err != nil { 171 t.Fatalf("read RowList: %v", err) 172 } 173 174 if got := strings.Join(elt, ","); got != want { 175 t.Fatalf("bulk read: wrong reads.\n got %q\nwant %q", got, want) 176 } 177} 178 179func TestIntegration_DeleteRow(t *testing.T) { 180 ctx := context.Background() 181 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 182 if err != nil { 183 t.Fatal(err) 184 } 185 defer cleanup() 186 187 if err := populatePresidentsGraph(table); err != nil { 188 t.Fatal(err) 189 } 190 191 // Delete a row and check it goes away. 192 mut := NewMutation() 193 mut.DeleteRow() 194 if err := table.Apply(ctx, "wmckinley", mut); err != nil { 195 t.Fatalf("Apply DeleteRow: %v", err) 196 } 197 row, err := table.ReadRow(ctx, "wmckinley") 198 if err != nil { 199 t.Fatalf("Reading a row after DeleteRow: %v", err) 200 } 201 if len(row) != 0 { 202 t.Fatalf("Read non-zero row after DeleteRow: %v", row) 203 } 204} 205 206func TestIntegration_ReadModifyWrite(t *testing.T) { 207 ctx := context.Background() 208 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 209 if err != nil { 210 t.Fatal(err) 211 } 212 defer cleanup() 213 214 if err := populatePresidentsGraph(table); err != nil { 215 t.Fatal(err) 216 } 217 218 if err := adminClient.CreateColumnFamily(ctx, tableName, "counter"); err != nil { 219 t.Fatalf("Creating column family: %v", err) 220 } 221 222 appendRMW := func(b []byte) *ReadModifyWrite { 223 rmw := NewReadModifyWrite() 224 rmw.AppendValue("counter", "likes", b) 225 return rmw 226 } 227 incRMW := func(n int64) *ReadModifyWrite { 228 rmw := NewReadModifyWrite() 229 rmw.Increment("counter", "likes", n) 230 return rmw 231 } 232 rmwSeq := []struct { 233 desc string 234 rmw *ReadModifyWrite 235 want []byte 236 }{ 237 { 238 desc: "append #1", 239 rmw: appendRMW([]byte{0, 0, 0}), 240 want: []byte{0, 0, 0}, 241 }, 242 { 243 desc: "append #2", 244 rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17 245 want: []byte{0, 0, 0, 0, 0, 0, 0, 17}, 246 }, 247 { 248 desc: "increment", 249 rmw: incRMW(8), 250 want: []byte{0, 0, 0, 0, 0, 0, 0, 25}, 251 }, 252 } 253 for _, step := range rmwSeq { 254 row, err := table.ApplyReadModifyWrite(ctx, "gwashington", step.rmw) 255 if err != nil { 256 t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) 257 } 258 // Make sure the modified cell returned by the RMW operation has a timestamp. 259 if row["counter"][0].Timestamp == 0 { 260 t.Fatalf("RMW returned cell timestamp: got %v, want > 0", row["counter"][0].Timestamp) 261 } 262 clearTimestamps(row) 263 wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}} 264 if !testutil.Equal(row, wantRow) { 265 t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow) 266 } 267 } 268 269 // Check for google-cloud-go/issues/723. RMWs that insert new rows should keep row order sorted in the emulator. 270 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-2", appendRMW([]byte{0})) 271 if err != nil { 272 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 273 } 274 _, err = table.ApplyReadModifyWrite(ctx, "issue-723-1", appendRMW([]byte{0})) 275 if err != nil { 276 t.Fatalf("ApplyReadModifyWrite null string: %v", err) 277 } 278 // Get only the correct row back on read. 279 r, err := table.ReadRow(ctx, "issue-723-1") 280 if err != nil { 281 t.Fatalf("Reading row: %v", err) 282 } 283 if r.Key() != "issue-723-1" { 284 t.Fatalf("ApplyReadModifyWrite: incorrect read after RMW,\n got %v\nwant %v", r.Key(), "issue-723-1") 285 } 286} 287 288func TestIntegration_ArbitraryTimestamps(t *testing.T) { 289 ctx := context.Background() 290 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 291 if err != nil { 292 t.Fatal(err) 293 } 294 defer cleanup() 295 296 // Test arbitrary timestamps more thoroughly. 297 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 298 t.Fatalf("Creating column family: %v", err) 299 } 300 const numVersions = 4 301 mut := NewMutation() 302 for i := 1; i < numVersions; i++ { 303 // Timestamps are used in thousands because the server 304 // only permits that granularity. 305 mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 306 mut.Set("ts", "col2", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 307 } 308 if err := table.Apply(ctx, "testrow", mut); err != nil { 309 t.Fatalf("Mutating row: %v", err) 310 } 311 r, err := table.ReadRow(ctx, "testrow") 312 if err != nil { 313 t.Fatalf("Reading row: %v", err) 314 } 315 wantRow := Row{"ts": []ReadItem{ 316 // These should be returned in descending timestamp order. 317 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 318 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 319 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 320 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 321 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 322 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 323 }} 324 if !testutil.Equal(r, wantRow) { 325 t.Fatalf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow) 326 } 327 328 // Do the same read, but filter to the latest two versions. 329 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 330 if err != nil { 331 t.Fatalf("Reading row: %v", err) 332 } 333 wantRow = Row{"ts": []ReadItem{ 334 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 335 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 336 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 337 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 338 }} 339 if !testutil.Equal(r, wantRow) { 340 t.Fatalf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow) 341 } 342 // Check cell offset / limit 343 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowLimitFilter(3))) 344 if err != nil { 345 t.Fatalf("Reading row: %v", err) 346 } 347 wantRow = Row{"ts": []ReadItem{ 348 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 349 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 350 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 351 }} 352 if !testutil.Equal(r, wantRow) { 353 t.Fatalf("Cell with multiple versions and CellsPerRowLimitFilter(3),\n got %v\nwant %v", r, wantRow) 354 } 355 r, err = table.ReadRow(ctx, "testrow", RowFilter(CellsPerRowOffsetFilter(3))) 356 if err != nil { 357 t.Fatalf("Reading row: %v", err) 358 } 359 wantRow = Row{"ts": []ReadItem{ 360 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 361 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 362 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 363 }} 364 if !testutil.Equal(r, wantRow) { 365 t.Fatalf("Cell with multiple versions and CellsPerRowOffsetFilter(3),\n got %v\nwant %v", r, wantRow) 366 } 367 // Check timestamp range filtering (with truncation) 368 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1001, 3000))) 369 if err != nil { 370 t.Fatalf("Reading row: %v", err) 371 } 372 wantRow = Row{"ts": []ReadItem{ 373 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 374 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 375 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 376 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 377 }} 378 if !testutil.Equal(r, wantRow) { 379 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 3000),\n got %v\nwant %v", r, wantRow) 380 } 381 r, err = table.ReadRow(ctx, "testrow", RowFilter(TimestampRangeFilterMicros(1000, 0))) 382 if err != nil { 383 t.Fatalf("Reading row: %v", err) 384 } 385 wantRow = Row{"ts": []ReadItem{ 386 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 387 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 388 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 389 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 390 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 391 {Row: "testrow", Column: "ts:col2", Timestamp: 1000, Value: []byte("val-1")}, 392 }} 393 if !testutil.Equal(r, wantRow) { 394 t.Fatalf("Cell with multiple versions and TimestampRangeFilter(1000, 0),\n got %v\nwant %v", r, wantRow) 395 } 396 // Delete non-existing cells, no such column family in this row 397 // Should not delete anything 398 if err := adminClient.CreateColumnFamily(ctx, tableName, "non-existing"); err != nil { 399 t.Fatalf("Creating column family: %v", err) 400 } 401 mut = NewMutation() 402 mut.DeleteTimestampRange("non-existing", "col", 2000, 3000) // half-open interval 403 if err := table.Apply(ctx, "testrow", mut); err != nil { 404 t.Fatalf("Mutating row: %v", err) 405 } 406 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 407 if err != nil { 408 t.Fatalf("Reading row: %v", err) 409 } 410 if !testutil.Equal(r, wantRow) { 411 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 412 } 413 // Delete non-existing cells, no such column in this column family 414 // Should not delete anything 415 mut = NewMutation() 416 mut.DeleteTimestampRange("ts", "non-existing", 2000, 3000) // half-open interval 417 if err := table.Apply(ctx, "testrow", mut); err != nil { 418 t.Fatalf("Mutating row: %v", err) 419 } 420 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(3))) 421 if err != nil { 422 t.Fatalf("Reading row: %v", err) 423 } 424 if !testutil.Equal(r, wantRow) { 425 t.Fatalf("Cell was deleted unexpectly,\n got %v\nwant %v", r, wantRow) 426 } 427 // Delete the cell with timestamp 2000 and repeat the last read, 428 // checking that we get ts 3000 and ts 1000. 429 mut = NewMutation() 430 mut.DeleteTimestampRange("ts", "col", 2001, 3000) // half-open interval 431 if err := table.Apply(ctx, "testrow", mut); err != nil { 432 t.Fatalf("Mutating row: %v", err) 433 } 434 r, err = table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 435 if err != nil { 436 t.Fatalf("Reading row: %v", err) 437 } 438 wantRow = Row{"ts": []ReadItem{ 439 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 440 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 441 {Row: "testrow", Column: "ts:col2", Timestamp: 3000, Value: []byte("val-3")}, 442 {Row: "testrow", Column: "ts:col2", Timestamp: 2000, Value: []byte("val-2")}, 443 }} 444 if !testutil.Equal(r, wantRow) { 445 t.Fatalf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow) 446 } 447 448 // Check DeleteCellsInFamily 449 if err := adminClient.CreateColumnFamily(ctx, tableName, "status"); err != nil { 450 t.Fatalf("Creating column family: %v", err) 451 } 452 453 mut = NewMutation() 454 mut.Set("status", "start", 2000, []byte("2")) 455 mut.Set("status", "end", 3000, []byte("3")) 456 mut.Set("ts", "col", 1000, []byte("1")) 457 if err := table.Apply(ctx, "row1", mut); err != nil { 458 t.Fatalf("Mutating row: %v", err) 459 } 460 if err := table.Apply(ctx, "row2", mut); err != nil { 461 t.Fatalf("Mutating row: %v", err) 462 } 463 464 mut = NewMutation() 465 mut.DeleteCellsInFamily("status") 466 if err := table.Apply(ctx, "row1", mut); err != nil { 467 t.Fatalf("Delete cf: %v", err) 468 } 469 470 // ColumnFamily removed 471 r, err = table.ReadRow(ctx, "row1") 472 if err != nil { 473 t.Fatalf("Reading row: %v", err) 474 } 475 wantRow = Row{"ts": []ReadItem{ 476 {Row: "row1", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 477 }} 478 if !testutil.Equal(r, wantRow) { 479 t.Fatalf("column family was not deleted.\n got %v\n want %v", r, wantRow) 480 } 481 482 // ColumnFamily not removed 483 r, err = table.ReadRow(ctx, "row2") 484 if err != nil { 485 t.Fatalf("Reading row: %v", err) 486 } 487 wantRow = Row{ 488 "ts": []ReadItem{ 489 {Row: "row2", Column: "ts:col", Timestamp: 1000, Value: []byte("1")}, 490 }, 491 "status": []ReadItem{ 492 {Row: "row2", Column: "status:end", Timestamp: 3000, Value: []byte("3")}, 493 {Row: "row2", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 494 }, 495 } 496 if !testutil.Equal(r, wantRow) { 497 t.Fatalf("Column family was deleted unexpectedly.\n got %v\n want %v", r, wantRow) 498 } 499 500 // Check DeleteCellsInColumn 501 mut = NewMutation() 502 mut.Set("status", "start", 2000, []byte("2")) 503 mut.Set("status", "middle", 3000, []byte("3")) 504 mut.Set("status", "end", 1000, []byte("1")) 505 if err := table.Apply(ctx, "row3", mut); err != nil { 506 t.Fatalf("Mutating row: %v", err) 507 } 508 mut = NewMutation() 509 mut.DeleteCellsInColumn("status", "middle") 510 if err := table.Apply(ctx, "row3", mut); err != nil { 511 t.Fatalf("Delete column: %v", err) 512 } 513 r, err = table.ReadRow(ctx, "row3") 514 if err != nil { 515 t.Fatalf("Reading row: %v", err) 516 } 517 wantRow = Row{ 518 "status": []ReadItem{ 519 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 520 {Row: "row3", Column: "status:start", Timestamp: 2000, Value: []byte("2")}, 521 }, 522 } 523 if !testutil.Equal(r, wantRow) { 524 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 525 } 526 mut = NewMutation() 527 mut.DeleteCellsInColumn("status", "start") 528 if err := table.Apply(ctx, "row3", mut); err != nil { 529 t.Fatalf("Delete column: %v", err) 530 } 531 r, err = table.ReadRow(ctx, "row3") 532 if err != nil { 533 t.Fatalf("Reading row: %v", err) 534 } 535 wantRow = Row{ 536 "status": []ReadItem{ 537 {Row: "row3", Column: "status:end", Timestamp: 1000, Value: []byte("1")}, 538 }, 539 } 540 if !testutil.Equal(r, wantRow) { 541 t.Fatalf("Column was not deleted.\n got %v\n want %v", r, wantRow) 542 } 543 mut = NewMutation() 544 mut.DeleteCellsInColumn("status", "end") 545 if err := table.Apply(ctx, "row3", mut); err != nil { 546 t.Fatalf("Delete column: %v", err) 547 } 548 r, err = table.ReadRow(ctx, "row3") 549 if err != nil { 550 t.Fatalf("Reading row: %v", err) 551 } 552 if len(r) != 0 { 553 t.Fatalf("Delete column: got %v, want empty row", r) 554 } 555 // Add same cell after delete 556 mut = NewMutation() 557 mut.Set("status", "end", 1000, []byte("1")) 558 if err := table.Apply(ctx, "row3", mut); err != nil { 559 t.Fatalf("Mutating row: %v", err) 560 } 561 r, err = table.ReadRow(ctx, "row3") 562 if err != nil { 563 t.Fatalf("Reading row: %v", err) 564 } 565 if !testutil.Equal(r, wantRow) { 566 t.Fatalf("Column was not deleted correctly.\n got %v\n want %v", r, wantRow) 567 } 568} 569 570func TestIntegration_HighlyConcurrentReadsAndWrites(t *testing.T) { 571 ctx := context.Background() 572 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 573 if err != nil { 574 t.Fatal(err) 575 } 576 defer cleanup() 577 578 if err := populatePresidentsGraph(table); err != nil { 579 t.Fatal(err) 580 } 581 582 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 583 t.Fatalf("Creating column family: %v", err) 584 } 585 586 // Do highly concurrent reads/writes. 587 const maxConcurrency = 1000 588 var wg sync.WaitGroup 589 for i := 0; i < maxConcurrency; i++ { 590 wg.Add(1) 591 go func() { 592 defer wg.Done() 593 switch r := rand.Intn(100); { // r ∈ [0,100) 594 case 0 <= r && r < 30: 595 // Do a read. 596 _, err := table.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1))) 597 if err != nil { 598 t.Errorf("Concurrent read: %v", err) 599 } 600 case 30 <= r && r < 100: 601 // Do a write. 602 mut := NewMutation() 603 mut.Set("ts", "col", 1000, []byte("data")) 604 if err := table.Apply(ctx, "testrow", mut); err != nil { 605 t.Errorf("Concurrent write: %v", err) 606 } 607 } 608 }() 609 } 610 wg.Wait() 611} 612 613func TestIntegration_LargeReadsWritesAndScans(t *testing.T) { 614 ctx := context.Background() 615 _, adminClient, table, tableName, cleanup, err := setupIntegration(ctx, t) 616 if err != nil { 617 t.Fatal(err) 618 } 619 defer cleanup() 620 621 if err := adminClient.CreateColumnFamily(ctx, tableName, "ts"); err != nil { 622 t.Fatalf("Creating column family: %v", err) 623 } 624 625 bigBytes := make([]byte, 5<<20) // 5 MB is larger than current default gRPC max of 4 MB, but less than the max we set. 626 nonsense := []byte("lorem ipsum dolor sit amet, ") 627 fill(bigBytes, nonsense) 628 mut := NewMutation() 629 mut.Set("ts", "col", 1000, bigBytes) 630 if err := table.Apply(ctx, "bigrow", mut); err != nil { 631 t.Fatalf("Big write: %v", err) 632 } 633 r, err := table.ReadRow(ctx, "bigrow") 634 if err != nil { 635 t.Fatalf("Big read: %v", err) 636 } 637 wantRow := Row{"ts": []ReadItem{ 638 {Row: "bigrow", Column: "ts:col", Timestamp: 1000, Value: bigBytes}, 639 }} 640 if !testutil.Equal(r, wantRow) { 641 t.Fatalf("Big read returned incorrect bytes: %v", r) 642 } 643 644 var wg sync.WaitGroup 645 // Now write 1000 rows, each with 82 KB values, then scan them all. 646 medBytes := make([]byte, 82<<10) 647 fill(medBytes, nonsense) 648 sem := make(chan int, 50) // do up to 50 mutations at a time. 649 for i := 0; i < 1000; i++ { 650 mut := NewMutation() 651 mut.Set("ts", "big-scan", 1000, medBytes) 652 row := fmt.Sprintf("row-%d", i) 653 wg.Add(1) 654 go func() { 655 defer wg.Done() 656 defer func() { <-sem }() 657 sem <- 1 658 if err := table.Apply(ctx, row, mut); err != nil { 659 t.Errorf("Preparing large scan: %v", err) 660 } 661 }() 662 } 663 wg.Wait() 664 n := 0 665 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 666 for _, ris := range r { 667 for _, ri := range ris { 668 n += len(ri.Value) 669 } 670 } 671 return true 672 }, RowFilter(ColumnFilter("big-scan"))) 673 if err != nil { 674 t.Fatalf("Doing large scan: %v", err) 675 } 676 if want := 1000 * len(medBytes); n != want { 677 t.Fatalf("Large scan returned %d bytes, want %d", n, want) 678 } 679 // Scan a subset of the 1000 rows that we just created, using a LimitRows ReadOption. 680 rc := 0 681 wantRc := 3 682 err = table.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 683 rc++ 684 return true 685 }, LimitRows(int64(wantRc))) 686 if err != nil { 687 t.Fatal(err) 688 } 689 if rc != wantRc { 690 t.Fatalf("Scan with row limit returned %d rows, want %d", rc, wantRc) 691 } 692 693 // Test bulk mutations 694 if err := adminClient.CreateColumnFamily(ctx, tableName, "bulk"); err != nil { 695 t.Fatalf("Creating column family: %v", err) 696 } 697 bulkData := map[string][]string{ 698 "red sox": {"2004", "2007", "2013"}, 699 "patriots": {"2001", "2003", "2004", "2014"}, 700 "celtics": {"1981", "1984", "1986", "2008"}, 701 } 702 var rowKeys []string 703 var muts []*Mutation 704 for row, ss := range bulkData { 705 mut := NewMutation() 706 for _, name := range ss { 707 mut.Set("bulk", name, 1000, []byte("1")) 708 } 709 rowKeys = append(rowKeys, row) 710 muts = append(muts, mut) 711 } 712 status, err := table.ApplyBulk(ctx, rowKeys, muts) 713 if err != nil { 714 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 715 } 716 if status != nil { 717 t.Fatalf("non-nil errors: %v", err) 718 } 719 720 // Read each row back 721 for rowKey, ss := range bulkData { 722 row, err := table.ReadRow(ctx, rowKey) 723 if err != nil { 724 t.Fatalf("Reading a bulk row: %v", err) 725 } 726 var wantItems []ReadItem 727 for _, val := range ss { 728 wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Timestamp: 1000, Value: []byte("1")}) 729 } 730 wantRow := Row{"bulk": wantItems} 731 if !testutil.Equal(row, wantRow) { 732 t.Fatalf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 733 } 734 } 735 736 // Test bulk write errors. 737 // Note: Setting timestamps as ServerTime makes sure the mutations are not retried on error. 738 badMut := NewMutation() 739 badMut.Set("badfamily", "col", ServerTime, nil) 740 badMut2 := NewMutation() 741 badMut2.Set("badfamily2", "goodcol", ServerTime, []byte("1")) 742 status, err = table.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2}) 743 if err != nil { 744 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 745 } 746 if status == nil { 747 t.Fatalf("No errors for bad bulk mutation") 748 } else if status[0] == nil || status[1] == nil { 749 t.Fatalf("No error for bad bulk mutation") 750 } 751} 752 753func TestIntegration_Read(t *testing.T) { 754 ctx := context.Background() 755 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 756 if err != nil { 757 t.Fatal(err) 758 } 759 defer cleanup() 760 761 // Insert some data. 762 initialData := map[string][]string{ 763 "wmckinley": {"tjefferson"}, 764 "gwashington": {"jadams"}, 765 "tjefferson": {"gwashington", "jadams", "wmckinley"}, 766 "jadams": {"gwashington", "tjefferson"}, 767 } 768 for row, ss := range initialData { 769 mut := NewMutation() 770 for _, name := range ss { 771 mut.Set("follows", name, 1000, []byte("1")) 772 } 773 if err := table.Apply(ctx, row, mut); err != nil { 774 t.Fatalf("Mutating row %q: %v", row, err) 775 } 776 } 777 778 for _, test := range []struct { 779 desc string 780 rr RowSet 781 filter Filter // may be nil 782 limit ReadOption // may be nil 783 784 // We do the read, grab all the cells, turn them into "<row>-<col>-<val>", 785 // and join with a comma. 786 want string 787 }{ 788 { 789 desc: "read all, unfiltered", 790 rr: RowRange{}, 791 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 792 }, 793 { 794 desc: "read with InfiniteRange, unfiltered", 795 rr: InfiniteRange("tjefferson"), 796 want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 797 }, 798 { 799 desc: "read with NewRange, unfiltered", 800 rr: NewRange("gargamel", "hubbard"), 801 want: "gwashington-jadams-1", 802 }, 803 { 804 desc: "read with PrefixRange, unfiltered", 805 rr: PrefixRange("jad"), 806 want: "jadams-gwashington-1,jadams-tjefferson-1", 807 }, 808 { 809 desc: "read with SingleRow, unfiltered", 810 rr: SingleRow("wmckinley"), 811 want: "wmckinley-tjefferson-1", 812 }, 813 { 814 desc: "read all, with ColumnFilter", 815 rr: RowRange{}, 816 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 817 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 818 }, 819 { 820 desc: "read all, with ColumnFilter, prefix", 821 rr: RowRange{}, 822 filter: ColumnFilter("j"), // no matches 823 want: "", 824 }, 825 { 826 desc: "read range, with ColumnRangeFilter", 827 rr: RowRange{}, 828 filter: ColumnRangeFilter("follows", "h", "k"), 829 want: "gwashington-jadams-1,tjefferson-jadams-1", 830 }, 831 { 832 desc: "read range from empty, with ColumnRangeFilter", 833 rr: RowRange{}, 834 filter: ColumnRangeFilter("follows", "", "u"), 835 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 836 }, 837 { 838 desc: "read range from start to empty, with ColumnRangeFilter", 839 rr: RowRange{}, 840 filter: ColumnRangeFilter("follows", "h", ""), 841 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 842 }, 843 { 844 desc: "read with RowKeyFilter", 845 rr: RowRange{}, 846 filter: RowKeyFilter(".*wash.*"), 847 want: "gwashington-jadams-1", 848 }, 849 { 850 desc: "read with RowKeyFilter, prefix", 851 rr: RowRange{}, 852 filter: RowKeyFilter("gwash"), 853 want: "", 854 }, 855 { 856 desc: "read with RowKeyFilter, no matches", 857 rr: RowRange{}, 858 filter: RowKeyFilter(".*xxx.*"), 859 want: "", 860 }, 861 { 862 desc: "read with FamilyFilter, no matches", 863 rr: RowRange{}, 864 filter: FamilyFilter(".*xxx.*"), 865 want: "", 866 }, 867 { 868 desc: "read with ColumnFilter + row limit", 869 rr: RowRange{}, 870 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 871 limit: LimitRows(2), 872 want: "gwashington-jadams-1,jadams-tjefferson-1", 873 }, 874 { 875 desc: "read all, strip values", 876 rr: RowRange{}, 877 filter: StripValueFilter(), 878 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 879 }, 880 { 881 desc: "read with ColumnFilter + row limit + strip values", 882 rr: RowRange{}, 883 filter: ChainFilters(ColumnFilter(".*j.*"), StripValueFilter()), // matches "jadams" and "tjefferson" 884 limit: LimitRows(2), 885 want: "gwashington-jadams-,jadams-tjefferson-", 886 }, 887 { 888 desc: "read with condition, strip values on true", 889 rr: RowRange{}, 890 filter: ConditionFilter(ColumnFilter(".*j.*"), StripValueFilter(), nil), 891 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 892 }, 893 { 894 desc: "read with condition, strip values on false", 895 rr: RowRange{}, 896 filter: ConditionFilter(ColumnFilter(".*xxx.*"), nil, StripValueFilter()), 897 want: "gwashington-jadams-,jadams-gwashington-,jadams-tjefferson-,tjefferson-gwashington-,tjefferson-jadams-,tjefferson-wmckinley-,wmckinley-tjefferson-", 898 }, 899 { 900 desc: "read with ValueRangeFilter + row limit", 901 rr: RowRange{}, 902 filter: ValueRangeFilter([]byte("1"), []byte("5")), // matches our value of "1" 903 limit: LimitRows(2), 904 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1", 905 }, 906 { 907 desc: "read with ValueRangeFilter, no match on exclusive end", 908 rr: RowRange{}, 909 filter: ValueRangeFilter([]byte("0"), []byte("1")), // no match 910 want: "", 911 }, 912 { 913 desc: "read with ValueRangeFilter, no matches", 914 rr: RowRange{}, 915 filter: ValueRangeFilter([]byte("3"), []byte("5")), // matches nothing 916 want: "", 917 }, 918 { 919 desc: "read with InterleaveFilter, no matches on all filters", 920 rr: RowRange{}, 921 filter: InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*")), 922 want: "", 923 }, 924 { 925 desc: "read with InterleaveFilter, no duplicate cells", 926 rr: RowRange{}, 927 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*j.*")), 928 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 929 }, 930 { 931 desc: "read with InterleaveFilter, with duplicate cells", 932 rr: RowRange{}, 933 filter: InterleaveFilters(ColumnFilter(".*g.*"), ColumnFilter(".*g.*")), 934 want: "jadams-gwashington-1,jadams-gwashington-1,tjefferson-gwashington-1,tjefferson-gwashington-1", 935 }, 936 { 937 desc: "read with a RowRangeList and no filter", 938 rr: RowRangeList{NewRange("gargamel", "hubbard"), InfiniteRange("wmckinley")}, 939 want: "gwashington-jadams-1,wmckinley-tjefferson-1", 940 }, 941 { 942 desc: "chain that excludes rows and matches nothing, in a condition", 943 rr: RowRange{}, 944 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), ColumnFilter(".*mckinley.*")), StripValueFilter(), nil), 945 want: "", 946 }, 947 { 948 desc: "chain that ends with an interleave that has no match. covers #804", 949 rr: RowRange{}, 950 filter: ConditionFilter(ChainFilters(ColumnFilter(".*j.*"), InterleaveFilters(ColumnFilter(".*x.*"), ColumnFilter(".*z.*"))), StripValueFilter(), nil), 951 want: "", 952 }, 953 } { 954 t.Run(test.desc, func(t *testing.T) { 955 var opts []ReadOption 956 if test.filter != nil { 957 opts = append(opts, RowFilter(test.filter)) 958 } 959 if test.limit != nil { 960 opts = append(opts, test.limit) 961 } 962 var elt []string 963 err := table.ReadRows(ctx, test.rr, func(r Row) bool { 964 for _, ris := range r { 965 for _, ri := range ris { 966 elt = append(elt, formatReadItem(ri)) 967 } 968 } 969 return true 970 }, opts...) 971 if err != nil { 972 t.Fatal(err) 973 } 974 if got := strings.Join(elt, ","); got != test.want { 975 t.Fatalf("got %q\nwant %q", got, test.want) 976 } 977 }) 978 } 979} 980 981func TestIntegration_SampleRowKeys(t *testing.T) { 982 ctx := context.Background() 983 _, _, table, _, cleanup, err := setupIntegration(ctx, t) 984 if err != nil { 985 t.Fatal(err) 986 } 987 defer cleanup() 988 989 // Insert some data. 990 initialData := map[string][]string{ 991 "wmckinley11": {"tjefferson11"}, 992 "gwashington77": {"jadams77"}, 993 "tjefferson0": {"gwashington0", "jadams0"}, 994 } 995 996 for row, ss := range initialData { 997 mut := NewMutation() 998 for _, name := range ss { 999 mut.Set("follows", name, 1000, []byte("1")) 1000 } 1001 if err := table.Apply(ctx, row, mut); err != nil { 1002 t.Fatalf("Mutating row %q: %v", row, err) 1003 } 1004 } 1005 sampleKeys, err := table.SampleRowKeys(context.Background()) 1006 if err != nil { 1007 t.Fatalf("%s: %v", "SampleRowKeys:", err) 1008 } 1009 if len(sampleKeys) == 0 { 1010 t.Error("SampleRowKeys length 0") 1011 } 1012} 1013 1014func TestIntegration_Admin(t *testing.T) { 1015 testEnv, err := NewIntegrationEnv() 1016 if err != nil { 1017 t.Fatalf("IntegrationEnv: %v", err) 1018 } 1019 defer testEnv.Close() 1020 1021 timeout := 2 * time.Second 1022 if testEnv.Config().UseProd { 1023 timeout = 5 * time.Minute 1024 } 1025 ctx, _ := context.WithTimeout(context.Background(), timeout) 1026 1027 adminClient, err := testEnv.NewAdminClient() 1028 if err != nil { 1029 t.Fatalf("NewAdminClient: %v", err) 1030 } 1031 defer adminClient.Close() 1032 1033 iAdminClient, err := testEnv.NewInstanceAdminClient() 1034 if err != nil { 1035 t.Fatalf("NewInstanceAdminClient: %v", err) 1036 } 1037 if iAdminClient != nil { 1038 defer iAdminClient.Close() 1039 1040 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1041 if err != nil { 1042 t.Errorf("InstanceInfo: %v", err) 1043 } 1044 if iInfo.Name != adminClient.instance { 1045 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1046 } 1047 } 1048 1049 list := func() []string { 1050 tbls, err := adminClient.Tables(ctx) 1051 if err != nil { 1052 t.Fatalf("Fetching list of tables: %v", err) 1053 } 1054 sort.Strings(tbls) 1055 return tbls 1056 } 1057 containsAll := func(got, want []string) bool { 1058 gotSet := make(map[string]bool) 1059 1060 for _, s := range got { 1061 gotSet[s] = true 1062 } 1063 for _, s := range want { 1064 if !gotSet[s] { 1065 return false 1066 } 1067 } 1068 return true 1069 } 1070 1071 defer adminClient.DeleteTable(ctx, "mytable") 1072 1073 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1074 t.Fatalf("Creating table: %v", err) 1075 } 1076 1077 defer adminClient.DeleteTable(ctx, "myothertable") 1078 1079 if err := adminClient.CreateTable(ctx, "myothertable"); err != nil { 1080 t.Fatalf("Creating table: %v", err) 1081 } 1082 1083 if got, want := list(), []string{"myothertable", "mytable"}; !containsAll(got, want) { 1084 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1085 } 1086 1087 must(adminClient.WaitForReplication(ctx, "mytable")) 1088 1089 if err := adminClient.DeleteTable(ctx, "myothertable"); err != nil { 1090 t.Fatalf("Deleting table: %v", err) 1091 } 1092 tables := list() 1093 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1094 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1095 } 1096 if got, unwanted := tables, []string{"myothertable"}; containsAll(got, unwanted) { 1097 t.Errorf("adminClient.Tables return %#v. unwanted %#v", got, unwanted) 1098 } 1099 1100 tblConf := TableConf{ 1101 TableID: "conftable", 1102 Families: map[string]GCPolicy{ 1103 "fam1": MaxVersionsPolicy(1), 1104 "fam2": MaxVersionsPolicy(2), 1105 }, 1106 } 1107 if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil { 1108 t.Fatalf("Creating table from TableConf: %v", err) 1109 } 1110 defer adminClient.DeleteTable(ctx, tblConf.TableID) 1111 1112 tblInfo, err := adminClient.TableInfo(ctx, tblConf.TableID) 1113 if err != nil { 1114 t.Fatalf("Getting table info: %v", err) 1115 } 1116 sort.Strings(tblInfo.Families) 1117 wantFams := []string{"fam1", "fam2"} 1118 if !testutil.Equal(tblInfo.Families, wantFams) { 1119 t.Errorf("Column family mismatch, got %v, want %v", tblInfo.Families, wantFams) 1120 } 1121 1122 // Populate mytable and drop row ranges 1123 if err = adminClient.CreateColumnFamily(ctx, "mytable", "cf"); err != nil { 1124 t.Fatalf("Creating column family: %v", err) 1125 } 1126 1127 client, err := testEnv.NewClient() 1128 if err != nil { 1129 t.Fatalf("NewClient: %v", err) 1130 } 1131 defer client.Close() 1132 1133 tbl := client.Open("mytable") 1134 1135 prefixes := []string{"a", "b", "c"} 1136 for _, prefix := range prefixes { 1137 for i := 0; i < 5; i++ { 1138 mut := NewMutation() 1139 mut.Set("cf", "col", 1000, []byte("1")) 1140 if err := tbl.Apply(ctx, fmt.Sprintf("%v-%v", prefix, i), mut); err != nil { 1141 t.Fatalf("Mutating row: %v", err) 1142 } 1143 } 1144 } 1145 1146 if err = adminClient.DropRowRange(ctx, "mytable", "a"); err != nil { 1147 t.Errorf("DropRowRange a: %v", err) 1148 } 1149 if err = adminClient.DropRowRange(ctx, "mytable", "c"); err != nil { 1150 t.Errorf("DropRowRange c: %v", err) 1151 } 1152 if err = adminClient.DropRowRange(ctx, "mytable", "x"); err != nil { 1153 t.Errorf("DropRowRange x: %v", err) 1154 } 1155 1156 var gotRowCount int 1157 must(tbl.ReadRows(ctx, RowRange{}, func(row Row) bool { 1158 gotRowCount++ 1159 if !strings.HasPrefix(row.Key(), "b") { 1160 t.Errorf("Invalid row after dropping range: %v", row) 1161 } 1162 return true 1163 })) 1164 if gotRowCount != 5 { 1165 t.Errorf("Invalid row count after dropping range: got %v, want %v", gotRowCount, 5) 1166 } 1167} 1168 1169func TestIntegration_AdminCreateInstance(t *testing.T) { 1170 if instanceToCreate == "" { 1171 t.Skip("instanceToCreate not set, skipping instance creation testing") 1172 } 1173 1174 testEnv, err := NewIntegrationEnv() 1175 if err != nil { 1176 t.Fatalf("IntegrationEnv: %v", err) 1177 } 1178 defer testEnv.Close() 1179 1180 if !testEnv.Config().UseProd { 1181 t.Skip("emulator doesn't support instance creation") 1182 } 1183 1184 timeout := 5 * time.Minute 1185 ctx, _ := context.WithTimeout(context.Background(), timeout) 1186 1187 iAdminClient, err := testEnv.NewInstanceAdminClient() 1188 if err != nil { 1189 t.Fatalf("NewInstanceAdminClient: %v", err) 1190 } 1191 defer iAdminClient.Close() 1192 1193 clusterID := instanceToCreate + "-cluster" 1194 1195 // Create a development instance 1196 conf := &InstanceConf{ 1197 InstanceId: instanceToCreate, 1198 ClusterId: clusterID, 1199 DisplayName: "test instance", 1200 Zone: instanceToCreateZone, 1201 InstanceType: DEVELOPMENT, 1202 } 1203 if err := iAdminClient.CreateInstance(ctx, conf); err != nil { 1204 t.Fatalf("CreateInstance: %v", err) 1205 } 1206 defer iAdminClient.DeleteInstance(ctx, instanceToCreate) 1207 1208 iInfo, err := iAdminClient.InstanceInfo(ctx, instanceToCreate) 1209 if err != nil { 1210 t.Fatalf("InstanceInfo: %v", err) 1211 } 1212 1213 // Basic return values are tested elsewhere, check instance type 1214 if iInfo.InstanceType != DEVELOPMENT { 1215 t.Fatalf("Instance is not DEVELOPMENT: %v", err) 1216 } 1217 1218 // Update everything we can about the instance in one call. 1219 confWithClusters := &InstanceWithClustersConfig{ 1220 InstanceID: instanceToCreate, 1221 DisplayName: "new display name", 1222 InstanceType: PRODUCTION, 1223 Clusters: []ClusterConfig{ 1224 {ClusterID: clusterID, NumNodes: 5}}, 1225 } 1226 1227 if err = iAdminClient.UpdateInstanceWithClusters(ctx, confWithClusters); err != nil { 1228 t.Fatalf("UpdateInstanceWithClusters: %v", err) 1229 } 1230 1231 iInfo, err = iAdminClient.InstanceInfo(ctx, instanceToCreate) 1232 if err != nil { 1233 t.Fatalf("InstanceInfo: %v", err) 1234 } 1235 1236 if iInfo.InstanceType != PRODUCTION { 1237 t.Fatalf("Instance type is not PRODUCTION: %v", err) 1238 } 1239 if got, want := iInfo.DisplayName, confWithClusters.DisplayName; got != want { 1240 t.Fatalf("Display name: %q, want: %q", got, want) 1241 } 1242 1243 cInfo, err := iAdminClient.GetCluster(ctx, instanceToCreate, clusterID) 1244 if err != nil { 1245 t.Fatalf("GetCluster: %v", err) 1246 } 1247 1248 if cInfo.ServeNodes != 5 { 1249 t.Fatalf("NumNodes: %v, want: %v", cInfo.ServeNodes, 5) 1250 } 1251} 1252 1253func TestIntegration_AdminSnapshot(t *testing.T) { 1254 testEnv, err := NewIntegrationEnv() 1255 if err != nil { 1256 t.Fatalf("IntegrationEnv: %v", err) 1257 } 1258 defer testEnv.Close() 1259 1260 if !testEnv.Config().UseProd { 1261 t.Skip("emulator doesn't support snapshots") 1262 } 1263 1264 timeout := 2 * time.Second 1265 if testEnv.Config().UseProd { 1266 timeout = 5 * time.Minute 1267 } 1268 ctx, _ := context.WithTimeout(context.Background(), timeout) 1269 1270 adminClient, err := testEnv.NewAdminClient() 1271 if err != nil { 1272 t.Fatalf("NewAdminClient: %v", err) 1273 } 1274 defer adminClient.Close() 1275 1276 table := testEnv.Config().Table 1277 cluster := testEnv.Config().Cluster 1278 1279 list := func(cluster string) ([]*SnapshotInfo, error) { 1280 infos := []*SnapshotInfo(nil) 1281 1282 it := adminClient.Snapshots(ctx, cluster) 1283 for { 1284 s, err := it.Next() 1285 if err == iterator.Done { 1286 break 1287 } 1288 if err != nil { 1289 return nil, err 1290 } 1291 infos = append(infos, s) 1292 } 1293 return infos, err 1294 } 1295 1296 // Delete the table at the end of the test. Schedule ahead of time 1297 // in case the client fails 1298 defer adminClient.DeleteTable(ctx, table) 1299 1300 if err := adminClient.CreateTable(ctx, table); err != nil { 1301 t.Fatalf("Creating table: %v", err) 1302 } 1303 1304 // Precondition: no snapshots 1305 snapshots, err := list(cluster) 1306 if err != nil { 1307 t.Fatalf("Initial snapshot list: %v", err) 1308 } 1309 if got, want := len(snapshots), 0; got != want { 1310 t.Fatalf("Initial snapshot list len: %d, want: %d", got, want) 1311 } 1312 1313 // Create snapshot 1314 defer adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot") 1315 1316 if err = adminClient.SnapshotTable(ctx, table, cluster, "mysnapshot", 5*time.Hour); err != nil { 1317 t.Fatalf("Creating snaphot: %v", err) 1318 } 1319 1320 // List snapshot 1321 snapshots, err = list(cluster) 1322 if err != nil { 1323 t.Fatalf("Listing snapshots: %v", err) 1324 } 1325 if got, want := len(snapshots), 1; got != want { 1326 t.Fatalf("Listing snapshot count: %d, want: %d", got, want) 1327 } 1328 if got, want := snapshots[0].Name, "mysnapshot"; got != want { 1329 t.Fatalf("Snapshot name: %s, want: %s", got, want) 1330 } 1331 if got, want := snapshots[0].SourceTable, table; got != want { 1332 t.Fatalf("Snapshot SourceTable: %s, want: %s", got, want) 1333 } 1334 if got, want := snapshots[0].DeleteTime, snapshots[0].CreateTime.Add(5*time.Hour); math.Abs(got.Sub(want).Minutes()) > 1 { 1335 t.Fatalf("Snapshot DeleteTime: %s, want: %s", got, want) 1336 } 1337 1338 // Get snapshot 1339 snapshot, err := adminClient.SnapshotInfo(ctx, cluster, "mysnapshot") 1340 if err != nil { 1341 t.Fatalf("SnapshotInfo: %v", snapshot) 1342 } 1343 if got, want := *snapshot, *snapshots[0]; got != want { 1344 t.Fatalf("SnapshotInfo: %v, want: %v", got, want) 1345 } 1346 1347 // Restore 1348 restoredTable := table + "-restored" 1349 defer adminClient.DeleteTable(ctx, restoredTable) 1350 if err = adminClient.CreateTableFromSnapshot(ctx, restoredTable, cluster, "mysnapshot"); err != nil { 1351 t.Fatalf("CreateTableFromSnapshot: %v", err) 1352 } 1353 if _, err := adminClient.TableInfo(ctx, restoredTable); err != nil { 1354 t.Fatalf("Restored TableInfo: %v", err) 1355 } 1356 1357 // Delete snapshot 1358 if err = adminClient.DeleteSnapshot(ctx, cluster, "mysnapshot"); err != nil { 1359 t.Fatalf("DeleteSnapshot: %v", err) 1360 } 1361 snapshots, err = list(cluster) 1362 if err != nil { 1363 t.Fatalf("List after Delete: %v", err) 1364 } 1365 if got, want := len(snapshots), 0; got != want { 1366 t.Fatalf("List after delete len: %d, want: %d", got, want) 1367 } 1368} 1369 1370func TestIntegration_Granularity(t *testing.T) { 1371 testEnv, err := NewIntegrationEnv() 1372 if err != nil { 1373 t.Fatalf("IntegrationEnv: %v", err) 1374 } 1375 defer testEnv.Close() 1376 1377 timeout := 2 * time.Second 1378 if testEnv.Config().UseProd { 1379 timeout = 5 * time.Minute 1380 } 1381 ctx, _ := context.WithTimeout(context.Background(), timeout) 1382 1383 adminClient, err := testEnv.NewAdminClient() 1384 if err != nil { 1385 t.Fatalf("NewAdminClient: %v", err) 1386 } 1387 defer adminClient.Close() 1388 1389 list := func() []string { 1390 tbls, err := adminClient.Tables(ctx) 1391 if err != nil { 1392 t.Fatalf("Fetching list of tables: %v", err) 1393 } 1394 sort.Strings(tbls) 1395 return tbls 1396 } 1397 containsAll := func(got, want []string) bool { 1398 gotSet := make(map[string]bool) 1399 1400 for _, s := range got { 1401 gotSet[s] = true 1402 } 1403 for _, s := range want { 1404 if !gotSet[s] { 1405 return false 1406 } 1407 } 1408 return true 1409 } 1410 1411 defer adminClient.DeleteTable(ctx, "mytable") 1412 1413 if err := adminClient.CreateTable(ctx, "mytable"); err != nil { 1414 t.Fatalf("Creating table: %v", err) 1415 } 1416 1417 tables := list() 1418 if got, want := tables, []string{"mytable"}; !containsAll(got, want) { 1419 t.Errorf("adminClient.Tables returned %#v, want %#v", got, want) 1420 } 1421 1422 // calling ModifyColumnFamilies to check the granularity of table 1423 prefix := adminClient.instancePrefix() 1424 req := &btapb.ModifyColumnFamiliesRequest{ 1425 Name: prefix + "/tables/" + "mytable", 1426 Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ 1427 Id: "cf", 1428 Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Create{&btapb.ColumnFamily{}}, 1429 }}, 1430 } 1431 table, err := adminClient.tClient.ModifyColumnFamilies(ctx, req) 1432 if err != nil { 1433 t.Fatalf("Creating column family: %v", err) 1434 } 1435 if table.Granularity != btapb.Table_TimestampGranularity(btapb.Table_MILLIS) { 1436 t.Errorf("ModifyColumnFamilies returned granularity %#v, want %#v", table.Granularity, btapb.Table_TimestampGranularity(btapb.Table_MILLIS)) 1437 } 1438} 1439 1440func TestIntegration_InstanceAdminClient_AppProfile(t *testing.T) { 1441 testEnv, err := NewIntegrationEnv() 1442 if err != nil { 1443 t.Fatalf("IntegrationEnv: %v", err) 1444 } 1445 defer testEnv.Close() 1446 1447 timeout := 2 * time.Second 1448 if testEnv.Config().UseProd { 1449 timeout = 5 * time.Minute 1450 } 1451 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1452 defer cancel() 1453 1454 adminClient, err := testEnv.NewAdminClient() 1455 if err != nil { 1456 t.Fatalf("NewAdminClient: %v", err) 1457 } 1458 defer adminClient.Close() 1459 1460 iAdminClient, err := testEnv.NewInstanceAdminClient() 1461 if err != nil { 1462 t.Fatalf("NewInstanceAdminClient: %v", err) 1463 } 1464 1465 if iAdminClient == nil { 1466 return 1467 } 1468 1469 defer iAdminClient.Close() 1470 profile := ProfileConf{ 1471 ProfileID: "app_profile1", 1472 InstanceID: adminClient.instance, 1473 ClusterID: testEnv.Config().Cluster, 1474 Description: "creating new app profile 1", 1475 RoutingPolicy: SingleClusterRouting, 1476 } 1477 1478 createdProfile, err := iAdminClient.CreateAppProfile(ctx, profile) 1479 if err != nil { 1480 t.Fatalf("Creating app profile: %v", err) 1481 1482 } 1483 1484 gotProfile, err := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 1485 1486 if err != nil { 1487 t.Fatalf("Get app profile: %v", err) 1488 } 1489 1490 if !proto.Equal(createdProfile, gotProfile) { 1491 t.Fatalf("created profile: %s, got profile: %s", createdProfile.Name, gotProfile.Name) 1492 1493 } 1494 1495 list := func(instanceID string) ([]*btapb.AppProfile, error) { 1496 profiles := []*btapb.AppProfile(nil) 1497 1498 it := iAdminClient.ListAppProfiles(ctx, instanceID) 1499 for { 1500 s, err := it.Next() 1501 if err == iterator.Done { 1502 break 1503 } 1504 if err != nil { 1505 return nil, err 1506 } 1507 profiles = append(profiles, s) 1508 } 1509 return profiles, err 1510 } 1511 1512 profiles, err := list(adminClient.instance) 1513 if err != nil { 1514 t.Fatalf("List app profile: %v", err) 1515 } 1516 1517 if got, want := len(profiles), 1; got != want { 1518 t.Fatalf("Initial app profile list len: %d, want: %d", got, want) 1519 } 1520 1521 for _, test := range []struct { 1522 desc string 1523 uattrs ProfileAttrsToUpdate 1524 want *btapb.AppProfile // nil means error 1525 }{ 1526 { 1527 desc: "empty update", 1528 uattrs: ProfileAttrsToUpdate{}, 1529 want: nil, 1530 }, 1531 1532 { 1533 desc: "empty description update", 1534 uattrs: ProfileAttrsToUpdate{Description: ""}, 1535 want: &btapb.AppProfile{ 1536 Name: gotProfile.Name, 1537 Description: "", 1538 RoutingPolicy: gotProfile.RoutingPolicy, 1539 Etag: gotProfile.Etag}, 1540 }, 1541 { 1542 desc: "routing update", 1543 uattrs: ProfileAttrsToUpdate{ 1544 RoutingPolicy: SingleClusterRouting, 1545 ClusterID: testEnv.Config().Cluster, 1546 }, 1547 want: &btapb.AppProfile{ 1548 Name: gotProfile.Name, 1549 Description: "", 1550 Etag: gotProfile.Etag, 1551 RoutingPolicy: &btapb.AppProfile_SingleClusterRouting_{ 1552 SingleClusterRouting: &btapb.AppProfile_SingleClusterRouting{ 1553 ClusterId: testEnv.Config().Cluster, 1554 }}, 1555 }, 1556 }, 1557 } { 1558 err = iAdminClient.UpdateAppProfile(ctx, adminClient.instance, "app_profile1", test.uattrs) 1559 if err != nil { 1560 if test.want != nil { 1561 t.Errorf("%s: %v", test.desc, err) 1562 } 1563 continue 1564 } 1565 if err == nil && test.want == nil { 1566 t.Errorf("%s: got nil, want error", test.desc) 1567 continue 1568 } 1569 1570 got, _ := iAdminClient.GetAppProfile(ctx, adminClient.instance, "app_profile1") 1571 1572 if !proto.Equal(got, test.want) { 1573 t.Fatalf("%s : got profile : %v, want profile: %v", test.desc, gotProfile, test.want) 1574 } 1575 1576 } 1577 1578 err = iAdminClient.DeleteAppProfile(ctx, adminClient.instance, "app_profile1") 1579 if err != nil { 1580 t.Fatalf("Delete app profile: %v", err) 1581 } 1582 1583} 1584 1585func TestIntegration_InstanceUpdate(t *testing.T) { 1586 testEnv, err := NewIntegrationEnv() 1587 if err != nil { 1588 t.Fatalf("IntegrationEnv: %v", err) 1589 } 1590 defer testEnv.Close() 1591 1592 timeout := 2 * time.Second 1593 if testEnv.Config().UseProd { 1594 timeout = 5 * time.Minute 1595 } 1596 ctx, cancel := context.WithTimeout(context.Background(), timeout) 1597 defer cancel() 1598 1599 adminClient, err := testEnv.NewAdminClient() 1600 if err != nil { 1601 t.Fatalf("NewAdminClient: %v", err) 1602 } 1603 1604 defer adminClient.Close() 1605 1606 iAdminClient, err := testEnv.NewInstanceAdminClient() 1607 if err != nil { 1608 t.Fatalf("NewInstanceAdminClient: %v", err) 1609 } 1610 1611 if iAdminClient == nil { 1612 return 1613 } 1614 1615 defer iAdminClient.Close() 1616 1617 iInfo, err := iAdminClient.InstanceInfo(ctx, adminClient.instance) 1618 if err != nil { 1619 t.Errorf("InstanceInfo: %v", err) 1620 } 1621 1622 if iInfo.Name != adminClient.instance { 1623 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1624 } 1625 1626 if iInfo.DisplayName != adminClient.instance { 1627 t.Errorf("InstanceInfo returned name %#v, want %#v", iInfo.Name, adminClient.instance) 1628 } 1629 1630 const numNodes = 4 1631 // update cluster nodes 1632 if err := iAdminClient.UpdateCluster(ctx, adminClient.instance, testEnv.Config().Cluster, int32(numNodes)); err != nil { 1633 t.Errorf("UpdateCluster: %v", err) 1634 } 1635 1636 // get cluster after updating 1637 cis, err := iAdminClient.GetCluster(ctx, adminClient.instance, testEnv.Config().Cluster) 1638 if err != nil { 1639 t.Errorf("GetCluster %v", err) 1640 } 1641 if cis.ServeNodes != int(numNodes) { 1642 t.Errorf("ServeNodes returned %d, want %d", cis.ServeNodes, int(numNodes)) 1643 } 1644} 1645 1646func setupIntegration(ctx context.Context, t *testing.T) (_ *Client, _ *AdminClient, table *Table, tableName string, cleanup func(), _ error) { 1647 testEnv, err := NewIntegrationEnv() 1648 if err != nil { 1649 return nil, nil, nil, "", nil, err 1650 } 1651 1652 var timeout time.Duration 1653 if testEnv.Config().UseProd { 1654 timeout = 10 * time.Minute 1655 t.Logf("Running test against production") 1656 } else { 1657 timeout = 1 * time.Minute 1658 t.Logf("bttest.Server running on %s", testEnv.Config().AdminEndpoint) 1659 } 1660 ctx, cancel := context.WithTimeout(ctx, timeout) 1661 defer cancel() 1662 1663 client, err := testEnv.NewClient() 1664 if err != nil { 1665 return nil, nil, nil, "", nil, err 1666 } 1667 1668 adminClient, err := testEnv.NewAdminClient() 1669 if err != nil { 1670 return nil, nil, nil, "", nil, err 1671 } 1672 1673 tableName = testEnv.Config().Table 1674 if err := adminClient.CreateTable(ctx, tableName); err != nil { 1675 return nil, nil, nil, "", nil, err 1676 } 1677 if err := adminClient.CreateColumnFamily(ctx, tableName, "follows"); err != nil { 1678 return nil, nil, nil, "", nil, err 1679 } 1680 1681 return client, adminClient, client.Open(tableName), tableName, func() { 1682 adminClient.DeleteTable(ctx, tableName) 1683 client.Close() 1684 adminClient.Close() 1685 }, nil 1686} 1687 1688func formatReadItem(ri ReadItem) string { 1689 // Use the column qualifier only to make the test data briefer. 1690 col := ri.Column[strings.Index(ri.Column, ":")+1:] 1691 return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value) 1692} 1693 1694func fill(b, sub []byte) { 1695 for len(b) > len(sub) { 1696 n := copy(b, sub) 1697 b = b[n:] 1698 } 1699} 1700 1701func clearTimestamps(r Row) { 1702 for _, ris := range r { 1703 for i := range ris { 1704 ris[i].Timestamp = 0 1705 } 1706 } 1707} 1708