1/* 2Copyright 2015 Google Inc. All Rights Reserved. 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable 18 19import ( 20 "flag" 21 "fmt" 22 "math/rand" 23 "reflect" 24 "sort" 25 "strings" 26 "sync" 27 "testing" 28 "time" 29 30 "cloud.google.com/go/bigtable/bttest" 31 "golang.org/x/net/context" 32 "google.golang.org/api/option" 33 "google.golang.org/grpc" 34) 35 36func TestPrefix(t *testing.T) { 37 tests := []struct { 38 prefix, succ string 39 }{ 40 {"", ""}, 41 {"\xff", ""}, // when used, "" means Infinity 42 {"x\xff", "y"}, 43 {"\xfe", "\xff"}, 44 } 45 for _, tc := range tests { 46 got := prefixSuccessor(tc.prefix) 47 if got != tc.succ { 48 t.Errorf("prefixSuccessor(%q) = %q, want %s", tc.prefix, got, tc.succ) 49 continue 50 } 51 r := PrefixRange(tc.prefix) 52 if tc.succ == "" && r.limit != "" { 53 t.Errorf("PrefixRange(%q) got limit %q", tc.prefix, r.limit) 54 } 55 if tc.succ != "" && r.limit != tc.succ { 56 t.Errorf("PrefixRange(%q) got limit %q, want %q", tc.prefix, r.limit, tc.succ) 57 } 58 } 59} 60 61var useProd = flag.String("use_prod", "", `if set to "proj,instance,table", run integration test against production`) 62 63func TestClientIntegration(t *testing.T) { 64 start := time.Now() 65 lastCheckpoint := start 66 checkpoint := func(s string) { 67 n := time.Now() 68 t.Logf("[%s] %v since start, %v since last checkpoint", s, n.Sub(start), n.Sub(lastCheckpoint)) 69 lastCheckpoint = n 70 } 71 72 proj, instance, table := "proj", "instance", "mytable" 73 var clientOpts []option.ClientOption 74 timeout := 10 * time.Second 75 if *useProd == "" { 76 srv, err := bttest.NewServer("127.0.0.1:0") 77 if err != nil { 78 t.Fatal(err) 79 } 80 defer srv.Close() 81 t.Logf("bttest.Server running on %s", srv.Addr) 82 conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) 83 if err != nil { 84 t.Fatalf("grpc.Dial: %v", err) 85 } 86 clientOpts = []option.ClientOption{option.WithGRPCConn(conn)} 87 } else { 88 t.Logf("Running test against production") 89 a := strings.SplitN(*useProd, ",", 3) 90 proj, instance, table = a[0], a[1], a[2] 91 timeout = 5 * time.Minute 92 } 93 94 ctx, _ := context.WithTimeout(context.Background(), timeout) 95 96 client, err := NewClient(ctx, proj, instance, clientOpts...) 97 if err != nil { 98 t.Fatalf("NewClient: %v", err) 99 } 100 defer client.Close() 101 checkpoint("dialed Client") 102 103 adminClient, err := NewAdminClient(ctx, proj, instance, clientOpts...) 104 if err != nil { 105 t.Fatalf("NewAdminClient: %v", err) 106 } 107 defer adminClient.Close() 108 checkpoint("dialed AdminClient") 109 110 // Delete the table at the end of the test. 111 // Do this even before creating the table so that if this is running 112 // against production and CreateTable fails there's a chance of cleaning it up. 113 defer adminClient.DeleteTable(ctx, table) 114 115 if err := adminClient.CreateTable(ctx, table); err != nil { 116 t.Fatalf("Creating table: %v", err) 117 } 118 checkpoint("created table") 119 if err := adminClient.CreateColumnFamily(ctx, table, "follows"); err != nil { 120 t.Fatalf("Creating column family: %v", err) 121 } 122 checkpoint(`created "follows" column family`) 123 124 tbl := client.Open(table) 125 126 // Insert some data. 127 initialData := map[string][]string{ 128 "wmckinley": []string{"tjefferson"}, 129 "gwashington": []string{"jadams"}, 130 "tjefferson": []string{"gwashington", "jadams"}, // wmckinley set conditionally below 131 "jadams": []string{"gwashington", "tjefferson"}, 132 } 133 for row, ss := range initialData { 134 mut := NewMutation() 135 for _, name := range ss { 136 mut.Set("follows", name, 0, []byte("1")) 137 } 138 if err := tbl.Apply(ctx, row, mut); err != nil { 139 t.Errorf("Mutating row %q: %v", row, err) 140 } 141 } 142 checkpoint("inserted initial data") 143 144 // Do a conditional mutation with a complex filter. 145 mutTrue := NewMutation() 146 mutTrue.Set("follows", "wmckinley", 0, []byte("1")) 147 filter := ChainFilters(ColumnFilter("gwash[iz].*"), ValueFilter(".")) 148 mut := NewCondMutation(filter, mutTrue, nil) 149 if err := tbl.Apply(ctx, "tjefferson", mut); err != nil { 150 t.Errorf("Conditionally mutating row: %v", err) 151 } 152 // Do a second condition mutation with a filter that does not match, 153 // and thus no changes should be made. 154 mutTrue = NewMutation() 155 mutTrue.DeleteRow() 156 filter = ColumnFilter("snoop.dogg") 157 mut = NewCondMutation(filter, mutTrue, nil) 158 if err := tbl.Apply(ctx, "tjefferson", mut); err != nil { 159 t.Errorf("Conditionally mutating row: %v", err) 160 } 161 checkpoint("did two conditional mutations") 162 163 // Fetch a row. 164 row, err := tbl.ReadRow(ctx, "jadams") 165 if err != nil { 166 t.Fatalf("Reading a row: %v", err) 167 } 168 wantRow := Row{ 169 "follows": []ReadItem{ 170 {Row: "jadams", Column: "follows:gwashington", Value: []byte("1")}, 171 {Row: "jadams", Column: "follows:tjefferson", Value: []byte("1")}, 172 }, 173 } 174 for _, ris := range row { 175 sort.Sort(byColumn(ris)) 176 } 177 if !reflect.DeepEqual(row, wantRow) { 178 t.Errorf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 179 } 180 checkpoint("tested ReadRow") 181 182 // Do a bunch of reads with filters. 183 readTests := []struct { 184 desc string 185 rr RowRange 186 filter Filter // may be nil 187 188 // We do the read, grab all the cells, turn them into "<row>-<col>-<val>", 189 // sort that list, and join with a comma. 190 want string 191 }{ 192 { 193 desc: "read all, unfiltered", 194 rr: RowRange{}, 195 want: "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 196 }, 197 { 198 desc: "read with InfiniteRange, unfiltered", 199 rr: InfiniteRange("tjefferson"), 200 want: "tjefferson-gwashington-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", 201 }, 202 { 203 desc: "read with NewRange, unfiltered", 204 rr: NewRange("gargamel", "hubbard"), 205 want: "gwashington-jadams-1", 206 }, 207 { 208 desc: "read with PrefixRange, unfiltered", 209 rr: PrefixRange("jad"), 210 want: "jadams-gwashington-1,jadams-tjefferson-1", 211 }, 212 { 213 desc: "read with SingleRow, unfiltered", 214 rr: SingleRow("wmckinley"), 215 want: "wmckinley-tjefferson-1", 216 }, 217 { 218 desc: "read all, with ColumnFilter", 219 rr: RowRange{}, 220 filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" 221 want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,wmckinley-tjefferson-1", 222 }, 223 } 224 for _, tc := range readTests { 225 var opts []ReadOption 226 if tc.filter != nil { 227 opts = append(opts, RowFilter(tc.filter)) 228 } 229 var elt []string 230 err := tbl.ReadRows(context.Background(), tc.rr, func(r Row) bool { 231 for _, ris := range r { 232 for _, ri := range ris { 233 elt = append(elt, formatReadItem(ri)) 234 } 235 } 236 return true 237 }, opts...) 238 if err != nil { 239 t.Errorf("%s: %v", tc.desc, err) 240 continue 241 } 242 sort.Strings(elt) 243 if got := strings.Join(elt, ","); got != tc.want { 244 t.Errorf("%s: wrong reads.\n got %q\nwant %q", tc.desc, got, tc.want) 245 } 246 } 247 // Read a RowList 248 var elt []string 249 keys := RowList{"wmckinley", "gwashington", "jadams"} 250 want := "gwashington-jadams-1,jadams-gwashington-1,jadams-tjefferson-1,wmckinley-tjefferson-1" 251 err = tbl.ReadRows(ctx, keys, func(r Row) bool { 252 for _, ris := range r { 253 for _, ri := range ris { 254 elt = append(elt, formatReadItem(ri)) 255 } 256 } 257 return true 258 }) 259 if err != nil { 260 t.Errorf("read RowList: %v", err) 261 } 262 263 sort.Strings(elt) 264 if got := strings.Join(elt, ","); got != want { 265 t.Errorf("bulk read: wrong reads.\n got %q\nwant %q", got, want) 266 } 267 checkpoint("tested ReadRows in a few ways") 268 269 // Do a scan and stop part way through. 270 // Verify that the ReadRows callback doesn't keep running. 271 stopped := false 272 err = tbl.ReadRows(ctx, InfiniteRange(""), func(r Row) bool { 273 if r.Key() < "h" { 274 return true 275 } 276 if !stopped { 277 stopped = true 278 return false 279 } 280 t.Errorf("ReadRows kept scanning to row %q after being told to stop", r.Key()) 281 return false 282 }) 283 if err != nil { 284 t.Errorf("Partial ReadRows: %v", err) 285 } 286 checkpoint("did partial ReadRows test") 287 288 // Delete a row and check it goes away. 289 mut = NewMutation() 290 mut.DeleteRow() 291 if err := tbl.Apply(ctx, "wmckinley", mut); err != nil { 292 t.Errorf("Apply DeleteRow: %v", err) 293 } 294 row, err = tbl.ReadRow(ctx, "wmckinley") 295 if err != nil { 296 t.Fatalf("Reading a row after DeleteRow: %v", err) 297 } 298 if len(row) != 0 { 299 t.Fatalf("Read non-zero row after DeleteRow: %v", row) 300 } 301 checkpoint("exercised DeleteRow") 302 303 // Check ReadModifyWrite. 304 305 if err := adminClient.CreateColumnFamily(ctx, table, "counter"); err != nil { 306 t.Fatalf("Creating column family: %v", err) 307 } 308 309 appendRMW := func(b []byte) *ReadModifyWrite { 310 rmw := NewReadModifyWrite() 311 rmw.AppendValue("counter", "likes", b) 312 return rmw 313 } 314 incRMW := func(n int64) *ReadModifyWrite { 315 rmw := NewReadModifyWrite() 316 rmw.Increment("counter", "likes", n) 317 return rmw 318 } 319 rmwSeq := []struct { 320 desc string 321 rmw *ReadModifyWrite 322 want []byte 323 }{ 324 { 325 desc: "append #1", 326 rmw: appendRMW([]byte{0, 0, 0}), 327 want: []byte{0, 0, 0}, 328 }, 329 { 330 desc: "append #2", 331 rmw: appendRMW([]byte{0, 0, 0, 0, 17}), // the remaining 40 bits to make a big-endian 17 332 want: []byte{0, 0, 0, 0, 0, 0, 0, 17}, 333 }, 334 { 335 desc: "increment", 336 rmw: incRMW(8), 337 want: []byte{0, 0, 0, 0, 0, 0, 0, 25}, 338 }, 339 } 340 for _, step := range rmwSeq { 341 row, err := tbl.ApplyReadModifyWrite(ctx, "gwashington", step.rmw) 342 if err != nil { 343 t.Fatalf("ApplyReadModifyWrite %+v: %v", step.rmw, err) 344 } 345 clearTimestamps(row) 346 wantRow := Row{"counter": []ReadItem{{Row: "gwashington", Column: "counter:likes", Value: step.want}}} 347 if !reflect.DeepEqual(row, wantRow) { 348 t.Fatalf("After %s,\n got %v\nwant %v", step.desc, row, wantRow) 349 } 350 } 351 checkpoint("tested ReadModifyWrite") 352 353 // Test arbitrary timestamps more thoroughly. 354 if err := adminClient.CreateColumnFamily(ctx, table, "ts"); err != nil { 355 t.Fatalf("Creating column family: %v", err) 356 } 357 const numVersions = 4 358 mut = NewMutation() 359 for i := 0; i < numVersions; i++ { 360 // Timestamps are used in thousands because the server 361 // only permits that granularity. 362 mut.Set("ts", "col", Timestamp(i*1000), []byte(fmt.Sprintf("val-%d", i))) 363 } 364 if err := tbl.Apply(ctx, "testrow", mut); err != nil { 365 t.Fatalf("Mutating row: %v", err) 366 } 367 r, err := tbl.ReadRow(ctx, "testrow") 368 if err != nil { 369 t.Fatalf("Reading row: %v", err) 370 } 371 wantRow = Row{"ts": []ReadItem{ 372 // These should be returned in descending timestamp order. 373 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 374 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 375 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 376 {Row: "testrow", Column: "ts:col", Timestamp: 0, Value: []byte("val-0")}, 377 }} 378 if !reflect.DeepEqual(r, wantRow) { 379 t.Errorf("Cell with multiple versions,\n got %v\nwant %v", r, wantRow) 380 } 381 // Do the same read, but filter to the latest two versions. 382 r, err = tbl.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 383 if err != nil { 384 t.Fatalf("Reading row: %v", err) 385 } 386 wantRow = Row{"ts": []ReadItem{ 387 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 388 {Row: "testrow", Column: "ts:col", Timestamp: 2000, Value: []byte("val-2")}, 389 }} 390 if !reflect.DeepEqual(r, wantRow) { 391 t.Errorf("Cell with multiple versions and LatestNFilter(2),\n got %v\nwant %v", r, wantRow) 392 } 393 // Delete the cell with timestamp 2000 and repeat the last read, 394 // checking that we get ts 3000 and ts 1000. 395 mut = NewMutation() 396 mut.DeleteTimestampRange("ts", "col", 2000, 3000) // half-open interval 397 if err := tbl.Apply(ctx, "testrow", mut); err != nil { 398 t.Fatalf("Mutating row: %v", err) 399 } 400 r, err = tbl.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(2))) 401 if err != nil { 402 t.Fatalf("Reading row: %v", err) 403 } 404 wantRow = Row{"ts": []ReadItem{ 405 {Row: "testrow", Column: "ts:col", Timestamp: 3000, Value: []byte("val-3")}, 406 {Row: "testrow", Column: "ts:col", Timestamp: 1000, Value: []byte("val-1")}, 407 }} 408 if !reflect.DeepEqual(r, wantRow) { 409 t.Errorf("Cell with multiple versions and LatestNFilter(2), after deleting timestamp 2000,\n got %v\nwant %v", r, wantRow) 410 } 411 checkpoint("tested multiple versions in a cell") 412 413 // Do highly concurrent reads/writes. 414 // TODO(dsymonds): Raise this to 1000 when https://github.com/grpc/grpc-go/issues/205 is resolved. 415 const maxConcurrency = 100 416 var wg sync.WaitGroup 417 for i := 0; i < maxConcurrency; i++ { 418 wg.Add(1) 419 go func() { 420 defer wg.Done() 421 switch r := rand.Intn(100); { // r ∈ [0,100) 422 case 0 <= r && r < 30: 423 // Do a read. 424 _, err := tbl.ReadRow(ctx, "testrow", RowFilter(LatestNFilter(1))) 425 if err != nil { 426 t.Errorf("Concurrent read: %v", err) 427 } 428 case 30 <= r && r < 100: 429 // Do a write. 430 mut := NewMutation() 431 mut.Set("ts", "col", 0, []byte("data")) 432 if err := tbl.Apply(ctx, "testrow", mut); err != nil { 433 t.Errorf("Concurrent write: %v", err) 434 } 435 } 436 }() 437 } 438 wg.Wait() 439 checkpoint("tested high concurrency") 440 441 // Large reads, writes and scans. 442 bigBytes := make([]byte, 3<<20) // 3 MB is large, but less than current gRPC max of 4 MB. 443 nonsense := []byte("lorem ipsum dolor sit amet, ") 444 fill(bigBytes, nonsense) 445 mut = NewMutation() 446 mut.Set("ts", "col", 0, bigBytes) 447 if err := tbl.Apply(ctx, "bigrow", mut); err != nil { 448 t.Errorf("Big write: %v", err) 449 } 450 r, err = tbl.ReadRow(ctx, "bigrow") 451 if err != nil { 452 t.Errorf("Big read: %v", err) 453 } 454 wantRow = Row{"ts": []ReadItem{ 455 {Row: "bigrow", Column: "ts:col", Value: bigBytes}, 456 }} 457 if !reflect.DeepEqual(r, wantRow) { 458 t.Errorf("Big read returned incorrect bytes: %v", r) 459 } 460 // Now write 1000 rows, each with 82 KB values, then scan them all. 461 medBytes := make([]byte, 82<<10) 462 fill(medBytes, nonsense) 463 sem := make(chan int, 50) // do up to 50 mutations at a time. 464 for i := 0; i < 1000; i++ { 465 mut := NewMutation() 466 mut.Set("ts", "big-scan", 0, medBytes) 467 row := fmt.Sprintf("row-%d", i) 468 wg.Add(1) 469 go func() { 470 defer wg.Done() 471 defer func() { <-sem }() 472 sem <- 1 473 if err := tbl.Apply(ctx, row, mut); err != nil { 474 t.Errorf("Preparing large scan: %v", err) 475 } 476 }() 477 } 478 wg.Wait() 479 n := 0 480 err = tbl.ReadRows(ctx, PrefixRange("row-"), func(r Row) bool { 481 for _, ris := range r { 482 for _, ri := range ris { 483 n += len(ri.Value) 484 } 485 } 486 return true 487 }, RowFilter(ColumnFilter("big-scan"))) 488 if err != nil { 489 t.Errorf("Doing large scan: %v", err) 490 } 491 if want := 1000 * len(medBytes); n != want { 492 t.Errorf("Large scan returned %d bytes, want %d", n, want) 493 } 494 checkpoint("tested big read/write/scan") 495 496 // Test bulk mutations 497 if err := adminClient.CreateColumnFamily(ctx, table, "bulk"); err != nil { 498 t.Fatalf("Creating column family: %v", err) 499 } 500 bulkData := map[string][]string{ 501 "red sox": []string{"2004", "2007", "2013"}, 502 "patriots": []string{"2001", "2003", "2004", "2014"}, 503 "celtics": []string{"1981", "1984", "1986", "2008"}, 504 } 505 var rowKeys []string 506 var muts []*Mutation 507 for row, ss := range bulkData { 508 mut := NewMutation() 509 for _, name := range ss { 510 mut.Set("bulk", name, 0, []byte("1")) 511 } 512 rowKeys = append(rowKeys, row) 513 muts = append(muts, mut) 514 } 515 status, err := tbl.ApplyBulk(ctx, rowKeys, muts) 516 if err != nil { 517 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 518 } 519 if status != nil { 520 t.Errorf("non-nil errors: %v", err) 521 } 522 checkpoint("inserted bulk data") 523 524 // Read each row back 525 for rowKey, ss := range bulkData { 526 row, err := tbl.ReadRow(ctx, rowKey) 527 if err != nil { 528 t.Fatalf("Reading a bulk row: %v", err) 529 } 530 for _, ris := range row { 531 sort.Sort(byColumn(ris)) 532 } 533 var wantItems []ReadItem 534 for _, val := range ss { 535 wantItems = append(wantItems, ReadItem{Row: rowKey, Column: "bulk:" + val, Value: []byte("1")}) 536 } 537 wantRow := Row{"bulk": wantItems} 538 if !reflect.DeepEqual(row, wantRow) { 539 t.Errorf("Read row mismatch.\n got %#v\nwant %#v", row, wantRow) 540 } 541 } 542 checkpoint("tested reading from bulk insert") 543 544 // Test bulk write errors 545 badMut := NewMutation() 546 badMut.Set("badfamily", "col", -1, nil) 547 badMut2 := NewMutation() 548 badMut2.Set("badfamily2", "goodcol", -1, []byte("1")) 549 status, err = tbl.ApplyBulk(ctx, []string{"badrow", "badrow2"}, []*Mutation{badMut, badMut2}) 550 if err != nil { 551 t.Fatalf("Bulk mutating rows %q: %v", rowKeys, err) 552 } 553 if status == nil { 554 t.Errorf("No errors for bad bulk mutation") 555 } 556 if status[0] == nil || status[1] == nil { 557 t.Errorf("No error for bad bulk mutation") 558 } 559} 560 561func formatReadItem(ri ReadItem) string { 562 // Use the column qualifier only to make the test data briefer. 563 col := ri.Column[strings.Index(ri.Column, ":")+1:] 564 return fmt.Sprintf("%s-%s-%s", ri.Row, col, ri.Value) 565} 566 567func fill(b, sub []byte) { 568 for len(b) > len(sub) { 569 n := copy(b, sub) 570 b = b[n:] 571 } 572} 573 574type byColumn []ReadItem 575 576func (b byColumn) Len() int { return len(b) } 577func (b byColumn) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 578func (b byColumn) Less(i, j int) bool { return b[i].Column < b[j].Column } 579 580func clearTimestamps(r Row) { 581 for _, ris := range r { 582 for i := range ris { 583 ris[i].Timestamp = 0 584 } 585 } 586} 587