1/* 2Copyright 2015 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17/* 18Package bttest contains test helpers for working with the bigtable package. 19 20To use a Server, create it, and then connect to it with no security: 21(The project/instance values are ignored.) 22 srv, err := bttest.NewServer("localhost:0") 23 ... 24 conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) 25 ... 26 client, err := bigtable.NewClient(ctx, proj, instance, 27 option.WithGRPCConn(conn)) 28 ... 29*/ 30package bttest // import "cloud.google.com/go/bigtable/bttest" 31 32import ( 33 "bytes" 34 "context" 35 "encoding/binary" 36 "fmt" 37 "log" 38 "math" 39 "math/rand" 40 "net" 41 "regexp" 42 "sort" 43 "strings" 44 "sync" 45 "time" 46 47 emptypb "github.com/golang/protobuf/ptypes/empty" 48 "github.com/golang/protobuf/ptypes/wrappers" 49 "github.com/google/btree" 50 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 51 btpb "google.golang.org/genproto/googleapis/bigtable/v2" 52 "google.golang.org/genproto/googleapis/longrunning" 53 statpb "google.golang.org/genproto/googleapis/rpc/status" 54 "google.golang.org/grpc" 55 "google.golang.org/grpc/codes" 56 "google.golang.org/grpc/status" 57 "rsc.io/binaryregexp" 58) 59 60const ( 61 // MilliSeconds field of the minimum valid Timestamp. 62 minValidMilliSeconds = 0 63 64 // MilliSeconds field of the max valid Timestamp. 65 // Must match the max value of type TimestampMicros (int64) 66 // truncated to the millis granularity by subtracting a remainder of 1000. 67 maxValidMilliSeconds = math.MaxInt64 - math.MaxInt64%1000 68) 69 70var validLabelTransformer = regexp.MustCompile(`[a-z0-9\-]{1,15}`) 71 72// Server is an in-memory Cloud Bigtable fake. 73// It is unauthenticated, and only a rough approximation. 74type Server struct { 75 Addr string 76 77 l net.Listener 78 srv *grpc.Server 79 s *server 80} 81 82// server is the real implementation of the fake. 83// It is a separate and unexported type so the API won't be cluttered with 84// methods that are only relevant to the fake's implementation. 85type server struct { 86 mu sync.Mutex 87 tables map[string]*table // keyed by fully qualified name 88 instances map[string]*btapb.Instance // keyed by fully qualified name 89 gcc chan int // set when gcloop starts, closed when server shuts down 90 91 // Any unimplemented methods will cause a panic. 92 btapb.BigtableTableAdminServer 93 btapb.BigtableInstanceAdminServer 94 btpb.BigtableServer 95} 96 97// NewServer creates a new Server. 98// The Server will be listening for gRPC connections, without TLS, 99// on the provided address. The resolved address is named by the Addr field. 100func NewServer(laddr string, opt ...grpc.ServerOption) (*Server, error) { 101 l, err := net.Listen("tcp", laddr) 102 if err != nil { 103 return nil, err 104 } 105 106 s := &Server{ 107 Addr: l.Addr().String(), 108 l: l, 109 srv: grpc.NewServer(opt...), 110 s: &server{ 111 tables: make(map[string]*table), 112 instances: make(map[string]*btapb.Instance), 113 }, 114 } 115 btapb.RegisterBigtableInstanceAdminServer(s.srv, s.s) 116 btapb.RegisterBigtableTableAdminServer(s.srv, s.s) 117 btpb.RegisterBigtableServer(s.srv, s.s) 118 119 go s.srv.Serve(s.l) 120 121 return s, nil 122} 123 124// Close shuts down the server. 125func (s *Server) Close() { 126 s.s.mu.Lock() 127 if s.s.gcc != nil { 128 close(s.s.gcc) 129 } 130 s.s.mu.Unlock() 131 132 s.srv.Stop() 133 s.l.Close() 134} 135 136func (s *server) CreateTable(ctx context.Context, req *btapb.CreateTableRequest) (*btapb.Table, error) { 137 tbl := req.Parent + "/tables/" + req.TableId 138 139 s.mu.Lock() 140 if _, ok := s.tables[tbl]; ok { 141 s.mu.Unlock() 142 return nil, status.Errorf(codes.AlreadyExists, "table %q already exists", tbl) 143 } 144 s.tables[tbl] = newTable(req) 145 s.mu.Unlock() 146 147 ct := &btapb.Table{ 148 Name: tbl, 149 ColumnFamilies: req.GetTable().GetColumnFamilies(), 150 Granularity: req.GetTable().GetGranularity(), 151 } 152 if ct.Granularity == 0 { 153 ct.Granularity = btapb.Table_MILLIS 154 } 155 return ct, nil 156} 157 158func (s *server) CreateTableFromSnapshot(context.Context, *btapb.CreateTableFromSnapshotRequest) (*longrunning.Operation, error) { 159 return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots") 160} 161 162func (s *server) ListTables(ctx context.Context, req *btapb.ListTablesRequest) (*btapb.ListTablesResponse, error) { 163 res := &btapb.ListTablesResponse{} 164 prefix := req.Parent + "/tables/" 165 166 s.mu.Lock() 167 for tbl := range s.tables { 168 if strings.HasPrefix(tbl, prefix) { 169 res.Tables = append(res.Tables, &btapb.Table{Name: tbl}) 170 } 171 } 172 s.mu.Unlock() 173 174 return res, nil 175} 176 177func (s *server) GetTable(ctx context.Context, req *btapb.GetTableRequest) (*btapb.Table, error) { 178 tbl := req.Name 179 180 s.mu.Lock() 181 tblIns, ok := s.tables[tbl] 182 s.mu.Unlock() 183 if !ok { 184 return nil, status.Errorf(codes.NotFound, "table %q not found", tbl) 185 } 186 187 return &btapb.Table{ 188 Name: tbl, 189 ColumnFamilies: toColumnFamilies(tblIns.columnFamilies()), 190 }, nil 191} 192 193func (s *server) DeleteTable(ctx context.Context, req *btapb.DeleteTableRequest) (*emptypb.Empty, error) { 194 s.mu.Lock() 195 defer s.mu.Unlock() 196 if _, ok := s.tables[req.Name]; !ok { 197 return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name) 198 } 199 delete(s.tables, req.Name) 200 return &emptypb.Empty{}, nil 201} 202 203func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColumnFamiliesRequest) (*btapb.Table, error) { 204 s.mu.Lock() 205 tbl, ok := s.tables[req.Name] 206 s.mu.Unlock() 207 if !ok { 208 return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name) 209 } 210 211 tbl.mu.Lock() 212 defer tbl.mu.Unlock() 213 214 for _, mod := range req.Modifications { 215 if create := mod.GetCreate(); create != nil { 216 if _, ok := tbl.families[mod.Id]; ok { 217 return nil, status.Errorf(codes.AlreadyExists, "family %q already exists", mod.Id) 218 } 219 newcf := &columnFamily{ 220 name: req.Name + "/columnFamilies/" + mod.Id, 221 order: tbl.counter, 222 gcRule: create.GcRule, 223 } 224 tbl.counter++ 225 tbl.families[mod.Id] = newcf 226 } else if mod.GetDrop() { 227 if _, ok := tbl.families[mod.Id]; !ok { 228 return nil, fmt.Errorf("can't delete unknown family %q", mod.Id) 229 } 230 delete(tbl.families, mod.Id) 231 232 // Purge all data for this column family 233 tbl.rows.Ascend(func(i btree.Item) bool { 234 r := i.(*row) 235 r.mu.Lock() 236 defer r.mu.Unlock() 237 delete(r.families, mod.Id) 238 return true 239 }) 240 } else if modify := mod.GetUpdate(); modify != nil { 241 if _, ok := tbl.families[mod.Id]; !ok { 242 return nil, fmt.Errorf("no such family %q", mod.Id) 243 } 244 newcf := &columnFamily{ 245 name: req.Name + "/columnFamilies/" + mod.Id, 246 gcRule: modify.GcRule, 247 } 248 // assume that we ALWAYS want to replace by the new setting 249 // we may need partial update through 250 tbl.families[mod.Id] = newcf 251 } 252 } 253 254 s.needGC() 255 return &btapb.Table{ 256 Name: req.Name, 257 ColumnFamilies: toColumnFamilies(tbl.families), 258 Granularity: btapb.Table_TimestampGranularity(btapb.Table_MILLIS), 259 }, nil 260} 261 262func (s *server) DropRowRange(ctx context.Context, req *btapb.DropRowRangeRequest) (*emptypb.Empty, error) { 263 s.mu.Lock() 264 tbl, ok := s.tables[req.Name] 265 s.mu.Unlock() 266 if !ok { 267 return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name) 268 } 269 270 tbl.mu.Lock() 271 defer tbl.mu.Unlock() 272 if req.GetDeleteAllDataFromTable() { 273 tbl.rows = btree.New(btreeDegree) 274 } else { 275 // Delete rows by prefix. 276 prefixBytes := req.GetRowKeyPrefix() 277 if prefixBytes == nil { 278 return nil, fmt.Errorf("missing row key prefix") 279 } 280 prefix := string(prefixBytes) 281 282 // The BTree does not specify what happens if rows are deleted during 283 // iteration, and it provides no "delete range" method. 284 // So we collect the rows first, then delete them one by one. 285 var rowsToDelete []*row 286 tbl.rows.AscendGreaterOrEqual(btreeKey(prefix), func(i btree.Item) bool { 287 r := i.(*row) 288 if strings.HasPrefix(r.key, prefix) { 289 rowsToDelete = append(rowsToDelete, r) 290 return true 291 } 292 return false // stop iteration 293 }) 294 for _, r := range rowsToDelete { 295 tbl.rows.Delete(r) 296 } 297 } 298 return &emptypb.Empty{}, nil 299} 300 301func (s *server) GenerateConsistencyToken(ctx context.Context, req *btapb.GenerateConsistencyTokenRequest) (*btapb.GenerateConsistencyTokenResponse, error) { 302 // Check that the table exists. 303 _, ok := s.tables[req.Name] 304 if !ok { 305 return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name) 306 } 307 308 return &btapb.GenerateConsistencyTokenResponse{ 309 ConsistencyToken: "TokenFor-" + req.Name, 310 }, nil 311} 312 313func (s *server) CheckConsistency(ctx context.Context, req *btapb.CheckConsistencyRequest) (*btapb.CheckConsistencyResponse, error) { 314 // Check that the table exists. 315 _, ok := s.tables[req.Name] 316 if !ok { 317 return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name) 318 } 319 320 // Check this is the right token. 321 if req.ConsistencyToken != "TokenFor-"+req.Name { 322 return nil, status.Errorf(codes.InvalidArgument, "token %q not valid", req.ConsistencyToken) 323 } 324 325 // Single cluster instances are always consistent. 326 return &btapb.CheckConsistencyResponse{ 327 Consistent: true, 328 }, nil 329} 330 331func (s *server) SnapshotTable(context.Context, *btapb.SnapshotTableRequest) (*longrunning.Operation, error) { 332 return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots") 333} 334 335func (s *server) GetSnapshot(context.Context, *btapb.GetSnapshotRequest) (*btapb.Snapshot, error) { 336 return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots") 337} 338 339func (s *server) ListSnapshots(context.Context, *btapb.ListSnapshotsRequest) (*btapb.ListSnapshotsResponse, error) { 340 return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots") 341} 342 343func (s *server) DeleteSnapshot(context.Context, *btapb.DeleteSnapshotRequest) (*emptypb.Empty, error) { 344 return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots") 345} 346 347func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRowsServer) error { 348 s.mu.Lock() 349 tbl, ok := s.tables[req.TableName] 350 s.mu.Unlock() 351 if !ok { 352 return status.Errorf(codes.NotFound, "table %q not found", req.TableName) 353 } 354 355 if err := validateRowRanges(req); err != nil { 356 return err 357 } 358 359 // Rows to read can be specified by a set of row keys and/or a set of row ranges. 360 // Output is a stream of sorted, de-duped rows. 361 tbl.mu.RLock() 362 rowSet := make(map[string]*row) 363 364 addRow := func(i btree.Item) bool { 365 r := i.(*row) 366 rowSet[r.key] = r 367 return true 368 } 369 370 if req.Rows != nil && 371 len(req.Rows.RowKeys)+len(req.Rows.RowRanges) > 0 { 372 // Add the explicitly given keys 373 for _, key := range req.Rows.RowKeys { 374 k := string(key) 375 if i := tbl.rows.Get(btreeKey(k)); i != nil { 376 addRow(i) 377 } 378 } 379 380 // Add keys from row ranges 381 for _, rr := range req.Rows.RowRanges { 382 var start, end string 383 switch sk := rr.StartKey.(type) { 384 case *btpb.RowRange_StartKeyClosed: 385 start = string(sk.StartKeyClosed) 386 case *btpb.RowRange_StartKeyOpen: 387 start = string(sk.StartKeyOpen) + "\x00" 388 } 389 switch ek := rr.EndKey.(type) { 390 case *btpb.RowRange_EndKeyClosed: 391 end = string(ek.EndKeyClosed) + "\x00" 392 case *btpb.RowRange_EndKeyOpen: 393 end = string(ek.EndKeyOpen) 394 } 395 switch { 396 case start == "" && end == "": 397 tbl.rows.Ascend(addRow) // all rows 398 case start == "": 399 tbl.rows.AscendLessThan(btreeKey(end), addRow) 400 case end == "": 401 tbl.rows.AscendGreaterOrEqual(btreeKey(start), addRow) 402 default: 403 tbl.rows.AscendRange(btreeKey(start), btreeKey(end), addRow) 404 } 405 } 406 } else { 407 // Read all rows 408 tbl.rows.Ascend(addRow) 409 } 410 tbl.mu.RUnlock() 411 412 rows := make([]*row, 0, len(rowSet)) 413 for _, r := range rowSet { 414 r.mu.Lock() 415 fams := len(r.families) 416 r.mu.Unlock() 417 418 if fams != 0 { 419 rows = append(rows, r) 420 } 421 } 422 sort.Sort(byRowKey(rows)) 423 424 limit := int(req.RowsLimit) 425 count := 0 426 for _, r := range rows { 427 if limit > 0 && count >= limit { 428 return nil 429 } 430 streamed, err := streamRow(stream, r, req.Filter) 431 if err != nil { 432 return err 433 } 434 if streamed { 435 count++ 436 } 437 } 438 return nil 439} 440 441// streamRow filters the given row and sends it via the given stream. 442// Returns true if at least one cell matched the filter and was streamed, false otherwise. 443func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter) (bool, error) { 444 r.mu.Lock() 445 nr := r.copy() 446 r.mu.Unlock() 447 r = nr 448 449 match, err := filterRow(f, r) 450 if err != nil { 451 return false, err 452 } 453 if !match { 454 return false, nil 455 } 456 457 rrr := &btpb.ReadRowsResponse{} 458 families := r.sortedFamilies() 459 for _, fam := range families { 460 for _, colName := range fam.colNames { 461 cells := fam.cells[colName] 462 if len(cells) == 0 { 463 continue 464 } 465 for _, cell := range cells { 466 rrr.Chunks = append(rrr.Chunks, &btpb.ReadRowsResponse_CellChunk{ 467 RowKey: []byte(r.key), 468 FamilyName: &wrappers.StringValue{Value: fam.name}, 469 Qualifier: &wrappers.BytesValue{Value: []byte(colName)}, 470 TimestampMicros: cell.ts, 471 Value: cell.value, 472 Labels: cell.labels, 473 }) 474 } 475 } 476 } 477 // We can't have a cell with just COMMIT set, which would imply a new empty cell. 478 // So modify the last cell to have the COMMIT flag set. 479 if len(rrr.Chunks) > 0 { 480 rrr.Chunks[len(rrr.Chunks)-1].RowStatus = &btpb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: true} 481 } 482 483 return true, stream.Send(rrr) 484} 485 486// filterRow modifies a row with the given filter. Returns true if at least one cell from the row matches, 487// false otherwise. If a filter is invalid, filterRow returns false and an error. 488func filterRow(f *btpb.RowFilter, r *row) (bool, error) { 489 if f == nil { 490 return true, nil 491 } 492 // Handle filters that apply beyond just including/excluding cells. 493 switch f := f.Filter.(type) { 494 case *btpb.RowFilter_BlockAllFilter: 495 if !f.BlockAllFilter { 496 return false, status.Errorf(codes.InvalidArgument, "block_all_filter must be true if set") 497 } 498 return false, nil 499 case *btpb.RowFilter_PassAllFilter: 500 if !f.PassAllFilter { 501 return false, status.Errorf(codes.InvalidArgument, "pass_all_filter must be true if set") 502 } 503 return true, nil 504 case *btpb.RowFilter_Chain_: 505 if len(f.Chain.Filters) < 2 { 506 return false, status.Errorf(codes.InvalidArgument, "Chain must contain at least two RowFilters") 507 } 508 for _, sub := range f.Chain.Filters { 509 match, err := filterRow(sub, r) 510 if err != nil { 511 return false, err 512 } 513 if !match { 514 return false, nil 515 } 516 } 517 return true, nil 518 case *btpb.RowFilter_Interleave_: 519 if len(f.Interleave.Filters) < 2 { 520 return false, status.Errorf(codes.InvalidArgument, "Interleave must contain at least two RowFilters") 521 } 522 srs := make([]*row, 0, len(f.Interleave.Filters)) 523 for _, sub := range f.Interleave.Filters { 524 sr := r.copy() 525 match, err := filterRow(sub, sr) 526 if err != nil { 527 return false, err 528 } 529 if match { 530 srs = append(srs, sr) 531 } 532 } 533 // merge 534 // TODO(dsymonds): is this correct? 535 r.families = make(map[string]*family) 536 for _, sr := range srs { 537 for _, fam := range sr.families { 538 f := r.getOrCreateFamily(fam.name, fam.order) 539 for colName, cs := range fam.cells { 540 f.cells[colName] = append(f.cellsByColumn(colName), cs...) 541 } 542 } 543 } 544 var count int 545 for _, fam := range r.families { 546 for _, cs := range fam.cells { 547 sort.Sort(byDescTS(cs)) 548 count += len(cs) 549 } 550 } 551 return count > 0, nil 552 case *btpb.RowFilter_CellsPerColumnLimitFilter: 553 lim := int(f.CellsPerColumnLimitFilter) 554 for _, fam := range r.families { 555 for col, cs := range fam.cells { 556 if len(cs) > lim { 557 fam.cells[col] = cs[:lim] 558 } 559 } 560 } 561 return true, nil 562 case *btpb.RowFilter_Condition_: 563 match, err := filterRow(f.Condition.PredicateFilter, r.copy()) 564 if err != nil { 565 return false, err 566 } 567 if match { 568 if f.Condition.TrueFilter == nil { 569 return false, nil 570 } 571 return filterRow(f.Condition.TrueFilter, r) 572 } 573 if f.Condition.FalseFilter == nil { 574 return false, nil 575 } 576 return filterRow(f.Condition.FalseFilter, r) 577 case *btpb.RowFilter_RowKeyRegexFilter: 578 rx, err := newRegexp(f.RowKeyRegexFilter) 579 if err != nil { 580 return false, status.Errorf(codes.InvalidArgument, "Error in field 'rowkey_regex_filter' : %v", err) 581 } 582 if !rx.MatchString(r.key) { 583 return false, nil 584 } 585 case *btpb.RowFilter_CellsPerRowLimitFilter: 586 // Grab the first n cells in the row. 587 lim := int(f.CellsPerRowLimitFilter) 588 for _, fam := range r.families { 589 for _, col := range fam.colNames { 590 cs := fam.cells[col] 591 if len(cs) > lim { 592 fam.cells[col] = cs[:lim] 593 lim = 0 594 } else { 595 lim -= len(cs) 596 } 597 } 598 } 599 return true, nil 600 case *btpb.RowFilter_CellsPerRowOffsetFilter: 601 // Skip the first n cells in the row. 602 offset := int(f.CellsPerRowOffsetFilter) 603 for _, fam := range r.families { 604 for _, col := range fam.colNames { 605 cs := fam.cells[col] 606 if len(cs) > offset { 607 fam.cells[col] = cs[offset:] 608 offset = 0 609 return true, nil 610 } 611 fam.cells[col] = cs[:0] 612 offset -= len(cs) 613 } 614 } 615 return true, nil 616 case *btpb.RowFilter_RowSampleFilter: 617 // The row sample filter "matches all cells from a row with probability 618 // p, and matches no cells from the row with probability 1-p." 619 // See https://github.com/googleapis/googleapis/blob/master/google/bigtable/v2/data.proto 620 if f.RowSampleFilter <= 0.0 || f.RowSampleFilter >= 1.0 { 621 return false, status.Error(codes.InvalidArgument, "row_sample_filter argument must be between 0.0 and 1.0") 622 } 623 return randFloat() < f.RowSampleFilter, nil 624 } 625 626 // Any other case, operate on a per-cell basis. 627 cellCount := 0 628 for _, fam := range r.families { 629 for colName, cs := range fam.cells { 630 filtered, err := filterCells(f, fam.name, colName, cs) 631 if err != nil { 632 return false, err 633 } 634 fam.cells[colName] = filtered 635 cellCount += len(fam.cells[colName]) 636 } 637 } 638 return cellCount > 0, nil 639} 640 641var randFloat = rand.Float64 642 643func filterCells(f *btpb.RowFilter, fam, col string, cs []cell) ([]cell, error) { 644 var ret []cell 645 for _, cell := range cs { 646 include, err := includeCell(f, fam, col, cell) 647 if err != nil { 648 return nil, err 649 } 650 if include { 651 cell, err = modifyCell(f, cell) 652 if err != nil { 653 return nil, err 654 } 655 ret = append(ret, cell) 656 } 657 } 658 return ret, nil 659} 660 661func modifyCell(f *btpb.RowFilter, c cell) (cell, error) { 662 if f == nil { 663 return c, nil 664 } 665 // Consider filters that may modify the cell contents 666 switch filter := f.Filter.(type) { 667 case *btpb.RowFilter_StripValueTransformer: 668 return cell{ts: c.ts}, nil 669 case *btpb.RowFilter_ApplyLabelTransformer: 670 if !validLabelTransformer.MatchString(filter.ApplyLabelTransformer) { 671 return cell{}, status.Errorf( 672 codes.InvalidArgument, 673 `apply_label_transformer must match RE2([a-z0-9\-]+), but found %v`, 674 filter.ApplyLabelTransformer, 675 ) 676 } 677 return cell{ts: c.ts, value: c.value, labels: []string{filter.ApplyLabelTransformer}}, nil 678 default: 679 return c, nil 680 } 681} 682 683func includeCell(f *btpb.RowFilter, fam, col string, cell cell) (bool, error) { 684 if f == nil { 685 return true, nil 686 } 687 // TODO(dsymonds): Implement many more filters. 688 switch f := f.Filter.(type) { 689 case *btpb.RowFilter_CellsPerColumnLimitFilter: 690 // Don't log, row-level filter 691 return true, nil 692 case *btpb.RowFilter_RowKeyRegexFilter: 693 // Don't log, row-level filter 694 return true, nil 695 case *btpb.RowFilter_StripValueTransformer: 696 // Don't log, cell-modifying filter 697 return true, nil 698 case *btpb.RowFilter_ApplyLabelTransformer: 699 // Don't log, cell-modifying filter 700 return true, nil 701 default: 702 log.Printf("WARNING: don't know how to handle filter of type %T (ignoring it)", f) 703 return true, nil 704 case *btpb.RowFilter_FamilyNameRegexFilter: 705 rx, err := newRegexp([]byte(f.FamilyNameRegexFilter)) 706 if err != nil { 707 return false, status.Errorf(codes.InvalidArgument, "Error in field 'family_name_regex_filter' : %v", err) 708 } 709 return rx.MatchString(fam), nil 710 case *btpb.RowFilter_ColumnQualifierRegexFilter: 711 rx, err := newRegexp(f.ColumnQualifierRegexFilter) 712 if err != nil { 713 return false, status.Errorf(codes.InvalidArgument, "Error in field 'column_qualifier_regex_filter' : %v", err) 714 } 715 return rx.MatchString(col), nil 716 case *btpb.RowFilter_ValueRegexFilter: 717 rx, err := newRegexp(f.ValueRegexFilter) 718 if err != nil { 719 return false, status.Errorf(codes.InvalidArgument, "Error in field 'value_regex_filter' : %v", err) 720 } 721 return rx.Match(cell.value), nil 722 case *btpb.RowFilter_ColumnRangeFilter: 723 if fam != f.ColumnRangeFilter.FamilyName { 724 return false, nil 725 } 726 // Start qualifier defaults to empty string closed 727 inRangeStart := func() bool { return col >= "" } 728 switch sq := f.ColumnRangeFilter.StartQualifier.(type) { 729 case *btpb.ColumnRange_StartQualifierOpen: 730 inRangeStart = func() bool { return col > string(sq.StartQualifierOpen) } 731 case *btpb.ColumnRange_StartQualifierClosed: 732 inRangeStart = func() bool { return col >= string(sq.StartQualifierClosed) } 733 } 734 // End qualifier defaults to no upper boundary 735 inRangeEnd := func() bool { return true } 736 switch eq := f.ColumnRangeFilter.EndQualifier.(type) { 737 case *btpb.ColumnRange_EndQualifierClosed: 738 inRangeEnd = func() bool { return col <= string(eq.EndQualifierClosed) } 739 case *btpb.ColumnRange_EndQualifierOpen: 740 inRangeEnd = func() bool { return col < string(eq.EndQualifierOpen) } 741 } 742 return inRangeStart() && inRangeEnd(), nil 743 case *btpb.RowFilter_TimestampRangeFilter: 744 // Server should only support millisecond precision. 745 if f.TimestampRangeFilter.StartTimestampMicros%int64(time.Millisecond/time.Microsecond) != 0 || f.TimestampRangeFilter.EndTimestampMicros%int64(time.Millisecond/time.Microsecond) != 0 { 746 return false, status.Errorf(codes.InvalidArgument, "Error in field 'timestamp_range_filter'. Maximum precision allowed in filter is millisecond.\nGot:\nStart: %v\nEnd: %v", f.TimestampRangeFilter.StartTimestampMicros, f.TimestampRangeFilter.EndTimestampMicros) 747 } 748 // Lower bound is inclusive and defaults to 0, upper bound is exclusive and defaults to infinity. 749 return cell.ts >= f.TimestampRangeFilter.StartTimestampMicros && 750 (f.TimestampRangeFilter.EndTimestampMicros == 0 || cell.ts < f.TimestampRangeFilter.EndTimestampMicros), nil 751 case *btpb.RowFilter_ValueRangeFilter: 752 v := cell.value 753 // Start value defaults to empty string closed 754 inRangeStart := func() bool { return bytes.Compare(v, []byte{}) >= 0 } 755 switch sv := f.ValueRangeFilter.StartValue.(type) { 756 case *btpb.ValueRange_StartValueOpen: 757 inRangeStart = func() bool { return bytes.Compare(v, sv.StartValueOpen) > 0 } 758 case *btpb.ValueRange_StartValueClosed: 759 inRangeStart = func() bool { return bytes.Compare(v, sv.StartValueClosed) >= 0 } 760 } 761 // End value defaults to no upper boundary 762 inRangeEnd := func() bool { return true } 763 switch ev := f.ValueRangeFilter.EndValue.(type) { 764 case *btpb.ValueRange_EndValueClosed: 765 inRangeEnd = func() bool { return bytes.Compare(v, ev.EndValueClosed) <= 0 } 766 case *btpb.ValueRange_EndValueOpen: 767 inRangeEnd = func() bool { return bytes.Compare(v, ev.EndValueOpen) < 0 } 768 } 769 return inRangeStart() && inRangeEnd(), nil 770 } 771} 772 773// escapeUTF is used to escape non-ASCII characters in pattern strings passed 774// to binaryregexp. This makes regexp column and row key matching work more 775// closely to what's seen with the real BigTable. 776func escapeUTF(in []byte) []byte { 777 var toEsc int 778 for _, c := range in { 779 if c > 127 { 780 toEsc++ 781 } 782 } 783 if toEsc == 0 { 784 return in 785 } 786 // Each escaped byte becomes 4 bytes (byte a1 becomes \xA1) 787 out := make([]byte, 0, len(in)+3*toEsc) 788 for _, c := range in { 789 if c > 127 { 790 h, l := c>>4, c&0xF 791 const conv = "0123456789ABCDEF" 792 out = append(out, '\\', 'x', conv[h], conv[l]) 793 } else { 794 out = append(out, c) 795 } 796 } 797 return out 798} 799 800func newRegexp(pat []byte) (*binaryregexp.Regexp, error) { 801 re, err := binaryregexp.Compile("^(?:" + string(escapeUTF(pat)) + ")$") // match entire target 802 if err != nil { 803 log.Printf("Bad pattern %q: %v", pat, err) 804 } 805 return re, err 806} 807 808func (s *server) MutateRow(ctx context.Context, req *btpb.MutateRowRequest) (*btpb.MutateRowResponse, error) { 809 s.mu.Lock() 810 tbl, ok := s.tables[req.TableName] 811 s.mu.Unlock() 812 if !ok { 813 return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName) 814 } 815 fs := tbl.columnFamilies() 816 r := tbl.mutableRow(string(req.RowKey)) 817 r.mu.Lock() 818 defer r.mu.Unlock() 819 if err := applyMutations(tbl, r, req.Mutations, fs); err != nil { 820 return nil, err 821 } 822 return &btpb.MutateRowResponse{}, nil 823} 824 825func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_MutateRowsServer) error { 826 s.mu.Lock() 827 tbl, ok := s.tables[req.TableName] 828 s.mu.Unlock() 829 if !ok { 830 return status.Errorf(codes.NotFound, "table %q not found", req.TableName) 831 } 832 res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))} 833 834 fs := tbl.columnFamilies() 835 836 for i, entry := range req.Entries { 837 r := tbl.mutableRow(string(entry.RowKey)) 838 r.mu.Lock() 839 code, msg := int32(codes.OK), "" 840 if err := applyMutations(tbl, r, entry.Mutations, fs); err != nil { 841 code = int32(codes.Internal) 842 msg = err.Error() 843 } 844 res.Entries[i] = &btpb.MutateRowsResponse_Entry{ 845 Index: int64(i), 846 Status: &statpb.Status{Code: code, Message: msg}, 847 } 848 r.mu.Unlock() 849 } 850 return stream.Send(res) 851} 852 853func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutateRowRequest) (*btpb.CheckAndMutateRowResponse, error) { 854 s.mu.Lock() 855 tbl, ok := s.tables[req.TableName] 856 s.mu.Unlock() 857 if !ok { 858 return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName) 859 } 860 res := &btpb.CheckAndMutateRowResponse{} 861 862 fs := tbl.columnFamilies() 863 864 r := tbl.mutableRow(string(req.RowKey)) 865 r.mu.Lock() 866 defer r.mu.Unlock() 867 868 // Figure out which mutation to apply. 869 whichMut := false 870 if req.PredicateFilter == nil { 871 // Use true_mutations iff row contains any cells. 872 whichMut = !r.isEmpty() 873 } else { 874 // Use true_mutations iff any cells in the row match the filter. 875 // TODO(dsymonds): This could be cheaper. 876 nr := r.copy() 877 878 match, err := filterRow(req.PredicateFilter, nr) 879 if err != nil { 880 return nil, err 881 } 882 whichMut = match && !nr.isEmpty() 883 } 884 res.PredicateMatched = whichMut 885 muts := req.FalseMutations 886 if whichMut { 887 muts = req.TrueMutations 888 } 889 890 if err := applyMutations(tbl, r, muts, fs); err != nil { 891 return nil, err 892 } 893 return res, nil 894} 895 896// applyMutations applies a sequence of mutations to a row. 897// fam should be a snapshot of the keys of tbl.families. 898// It assumes r.mu is locked. 899func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*columnFamily) error { 900 for _, mut := range muts { 901 switch mut := mut.Mutation.(type) { 902 default: 903 return fmt.Errorf("can't handle mutation type %T", mut) 904 case *btpb.Mutation_SetCell_: 905 set := mut.SetCell 906 if _, ok := fs[set.FamilyName]; !ok { 907 return fmt.Errorf("unknown family %q", set.FamilyName) 908 } 909 ts := set.TimestampMicros 910 if ts == -1 { // bigtable.ServerTime 911 ts = newTimestamp() 912 } 913 if !tbl.validTimestamp(ts) { 914 return fmt.Errorf("invalid timestamp %d", ts) 915 } 916 fam := set.FamilyName 917 col := string(set.ColumnQualifier) 918 919 newCell := cell{ts: ts, value: set.Value} 920 f := r.getOrCreateFamily(fam, fs[fam].order) 921 f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell) 922 case *btpb.Mutation_DeleteFromColumn_: 923 del := mut.DeleteFromColumn 924 if _, ok := fs[del.FamilyName]; !ok { 925 return fmt.Errorf("unknown family %q", del.FamilyName) 926 } 927 fam := del.FamilyName 928 col := string(del.ColumnQualifier) 929 if _, ok := r.families[fam]; ok { 930 cs := r.families[fam].cells[col] 931 if del.TimeRange != nil { 932 tsr := del.TimeRange 933 if !tbl.validTimestamp(tsr.StartTimestampMicros) { 934 return fmt.Errorf("invalid timestamp %d", tsr.StartTimestampMicros) 935 } 936 if !tbl.validTimestamp(tsr.EndTimestampMicros) && tsr.EndTimestampMicros != 0 { 937 return fmt.Errorf("invalid timestamp %d", tsr.EndTimestampMicros) 938 } 939 if tsr.StartTimestampMicros >= tsr.EndTimestampMicros && tsr.EndTimestampMicros != 0 { 940 return fmt.Errorf("inverted or invalid timestamp range [%d, %d]", tsr.StartTimestampMicros, tsr.EndTimestampMicros) 941 } 942 943 // Find half-open interval to remove. 944 // Cells are in descending timestamp order, 945 // so the predicates to sort.Search are inverted. 946 si, ei := 0, len(cs) 947 if tsr.StartTimestampMicros > 0 { 948 ei = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.StartTimestampMicros }) 949 } 950 if tsr.EndTimestampMicros > 0 { 951 si = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.EndTimestampMicros }) 952 } 953 if si < ei { 954 copy(cs[si:], cs[ei:]) 955 cs = cs[:len(cs)-(ei-si)] 956 } 957 } else { 958 cs = nil 959 } 960 if len(cs) == 0 { 961 delete(r.families[fam].cells, col) 962 colNames := r.families[fam].colNames 963 i := sort.Search(len(colNames), func(i int) bool { return colNames[i] >= col }) 964 if i < len(colNames) && colNames[i] == col { 965 r.families[fam].colNames = append(colNames[:i], colNames[i+1:]...) 966 } 967 if len(r.families[fam].cells) == 0 { 968 delete(r.families, fam) 969 } 970 } else { 971 r.families[fam].cells[col] = cs 972 } 973 } 974 case *btpb.Mutation_DeleteFromRow_: 975 r.families = make(map[string]*family) 976 case *btpb.Mutation_DeleteFromFamily_: 977 fampre := mut.DeleteFromFamily.FamilyName 978 delete(r.families, fampre) 979 } 980 } 981 return nil 982} 983 984func maxTimestamp(x, y int64) int64 { 985 if x > y { 986 return x 987 } 988 return y 989} 990 991func newTimestamp() int64 { 992 ts := time.Now().UnixNano() / 1e3 993 ts -= ts % 1000 // round to millisecond granularity 994 return ts 995} 996 997func appendOrReplaceCell(cs []cell, newCell cell) []cell { 998 replaced := false 999 for i, cell := range cs { 1000 if cell.ts == newCell.ts { 1001 cs[i] = newCell 1002 replaced = true 1003 break 1004 } 1005 } 1006 if !replaced { 1007 cs = append(cs, newCell) 1008 } 1009 sort.Sort(byDescTS(cs)) 1010 return cs 1011} 1012 1013func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWriteRowRequest) (*btpb.ReadModifyWriteRowResponse, error) { 1014 s.mu.Lock() 1015 tbl, ok := s.tables[req.TableName] 1016 s.mu.Unlock() 1017 if !ok { 1018 return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName) 1019 } 1020 1021 fs := tbl.columnFamilies() 1022 1023 rowKey := string(req.RowKey) 1024 r := tbl.mutableRow(rowKey) 1025 resultRow := newRow(rowKey) // copy of updated cells 1026 1027 // This must be done before the row lock, acquired below, is released. 1028 r.mu.Lock() 1029 defer r.mu.Unlock() 1030 // Assume all mutations apply to the most recent version of the cell. 1031 // TODO(dsymonds): Verify this assumption and document it in the proto. 1032 for _, rule := range req.Rules { 1033 if _, ok := fs[rule.FamilyName]; !ok { 1034 return nil, fmt.Errorf("unknown family %q", rule.FamilyName) 1035 } 1036 1037 fam := rule.FamilyName 1038 col := string(rule.ColumnQualifier) 1039 isEmpty := false 1040 f := r.getOrCreateFamily(fam, fs[fam].order) 1041 cs := f.cells[col] 1042 isEmpty = len(cs) == 0 1043 1044 ts := newTimestamp() 1045 var newCell, prevCell cell 1046 if !isEmpty { 1047 cells := r.families[fam].cells[col] 1048 prevCell = cells[0] 1049 1050 // ts is the max of now or the prev cell's timestamp in case the 1051 // prev cell is in the future 1052 ts = maxTimestamp(ts, prevCell.ts) 1053 } 1054 1055 switch rule := rule.Rule.(type) { 1056 default: 1057 return nil, fmt.Errorf("unknown RMW rule oneof %T", rule) 1058 case *btpb.ReadModifyWriteRule_AppendValue: 1059 newCell = cell{ts: ts, value: append(prevCell.value, rule.AppendValue...)} 1060 case *btpb.ReadModifyWriteRule_IncrementAmount: 1061 var v int64 1062 if !isEmpty { 1063 prevVal := prevCell.value 1064 if len(prevVal) != 8 { 1065 return nil, fmt.Errorf("increment on non-64-bit value") 1066 } 1067 v = int64(binary.BigEndian.Uint64(prevVal)) 1068 } 1069 v += rule.IncrementAmount 1070 var val [8]byte 1071 binary.BigEndian.PutUint64(val[:], uint64(v)) 1072 newCell = cell{ts: ts, value: val[:]} 1073 } 1074 1075 // Store the new cell 1076 f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell) 1077 1078 // Store a copy for the result row 1079 resultFamily := resultRow.getOrCreateFamily(fam, fs[fam].order) 1080 resultFamily.cellsByColumn(col) // create the column 1081 resultFamily.cells[col] = []cell{newCell} // overwrite the cells 1082 } 1083 1084 // Build the response using the result row 1085 res := &btpb.Row{ 1086 Key: req.RowKey, 1087 Families: make([]*btpb.Family, len(resultRow.families)), 1088 } 1089 1090 for i, family := range resultRow.sortedFamilies() { 1091 res.Families[i] = &btpb.Family{ 1092 Name: family.name, 1093 Columns: make([]*btpb.Column, len(family.colNames)), 1094 } 1095 1096 for j, colName := range family.colNames { 1097 res.Families[i].Columns[j] = &btpb.Column{ 1098 Qualifier: []byte(colName), 1099 Cells: []*btpb.Cell{{ 1100 TimestampMicros: family.cells[colName][0].ts, 1101 Value: family.cells[colName][0].value, 1102 }}, 1103 } 1104 } 1105 } 1106 return &btpb.ReadModifyWriteRowResponse{Row: res}, nil 1107} 1108 1109func (s *server) SampleRowKeys(req *btpb.SampleRowKeysRequest, stream btpb.Bigtable_SampleRowKeysServer) error { 1110 s.mu.Lock() 1111 tbl, ok := s.tables[req.TableName] 1112 s.mu.Unlock() 1113 if !ok { 1114 return status.Errorf(codes.NotFound, "table %q not found", req.TableName) 1115 } 1116 1117 tbl.mu.RLock() 1118 defer tbl.mu.RUnlock() 1119 1120 // The return value of SampleRowKeys is very loosely defined. Return at least the 1121 // final row key in the table and choose other row keys randomly. 1122 var offset int64 1123 var err error 1124 i := 0 1125 tbl.rows.Ascend(func(it btree.Item) bool { 1126 row := it.(*row) 1127 if i == tbl.rows.Len()-1 || rand.Int31n(100) == 0 { 1128 resp := &btpb.SampleRowKeysResponse{ 1129 RowKey: []byte(row.key), 1130 OffsetBytes: offset, 1131 } 1132 err = stream.Send(resp) 1133 if err != nil { 1134 return false 1135 } 1136 } 1137 offset += int64(row.size()) 1138 i++ 1139 return true 1140 }) 1141 return err 1142} 1143 1144// needGC is invoked whenever the server needs gcloop running. 1145func (s *server) needGC() { 1146 s.mu.Lock() 1147 if s.gcc == nil { 1148 s.gcc = make(chan int) 1149 go s.gcloop(s.gcc) 1150 } 1151 s.mu.Unlock() 1152} 1153 1154func (s *server) gcloop(done <-chan int) { 1155 const ( 1156 minWait = 500 // ms 1157 maxWait = 1500 // ms 1158 ) 1159 1160 for { 1161 // Wait for a random time interval. 1162 d := time.Duration(minWait+rand.Intn(maxWait-minWait)) * time.Millisecond 1163 select { 1164 case <-time.After(d): 1165 case <-done: 1166 return // server has been closed 1167 } 1168 1169 // Do a GC pass over all tables. 1170 var tables []*table 1171 s.mu.Lock() 1172 for _, tbl := range s.tables { 1173 tables = append(tables, tbl) 1174 } 1175 s.mu.Unlock() 1176 for _, tbl := range tables { 1177 tbl.gc() 1178 } 1179 } 1180} 1181 1182type table struct { 1183 mu sync.RWMutex 1184 counter uint64 // increment by 1 when a new family is created 1185 families map[string]*columnFamily // keyed by plain family name 1186 rows *btree.BTree // indexed by row key 1187} 1188 1189const btreeDegree = 16 1190 1191func newTable(ctr *btapb.CreateTableRequest) *table { 1192 fams := make(map[string]*columnFamily) 1193 c := uint64(0) 1194 if ctr.Table != nil { 1195 for id, cf := range ctr.Table.ColumnFamilies { 1196 fams[id] = &columnFamily{ 1197 name: ctr.Parent + "/columnFamilies/" + id, 1198 order: c, 1199 gcRule: cf.GcRule, 1200 } 1201 c++ 1202 } 1203 } 1204 return &table{ 1205 families: fams, 1206 counter: c, 1207 rows: btree.New(btreeDegree), 1208 } 1209} 1210 1211func (t *table) validTimestamp(ts int64) bool { 1212 if ts < minValidMilliSeconds || ts > maxValidMilliSeconds { 1213 return false 1214 } 1215 1216 // Assume millisecond granularity is required. 1217 return ts%1000 == 0 1218} 1219 1220func (t *table) columnFamilies() map[string]*columnFamily { 1221 cp := make(map[string]*columnFamily) 1222 t.mu.RLock() 1223 for fam, cf := range t.families { 1224 cp[fam] = cf 1225 } 1226 t.mu.RUnlock() 1227 return cp 1228} 1229 1230func (t *table) mutableRow(key string) *row { 1231 bkey := btreeKey(key) 1232 // Try fast path first. 1233 t.mu.RLock() 1234 i := t.rows.Get(bkey) 1235 t.mu.RUnlock() 1236 if i != nil { 1237 return i.(*row) 1238 } 1239 1240 // We probably need to create the row. 1241 t.mu.Lock() 1242 defer t.mu.Unlock() 1243 i = t.rows.Get(bkey) 1244 if i != nil { 1245 return i.(*row) 1246 } 1247 r := newRow(key) 1248 t.rows.ReplaceOrInsert(r) 1249 return r 1250} 1251 1252func (t *table) gc() { 1253 // This method doesn't add or remove rows, so we only need a read lock for the table. 1254 t.mu.RLock() 1255 defer t.mu.RUnlock() 1256 1257 // Gather GC rules we'll apply. 1258 rules := make(map[string]*btapb.GcRule) // keyed by "fam" 1259 for fam, cf := range t.families { 1260 if cf.gcRule != nil { 1261 rules[fam] = cf.gcRule 1262 } 1263 } 1264 if len(rules) == 0 { 1265 return 1266 } 1267 1268 t.rows.Ascend(func(i btree.Item) bool { 1269 r := i.(*row) 1270 r.mu.Lock() 1271 r.gc(rules) 1272 r.mu.Unlock() 1273 return true 1274 }) 1275} 1276 1277type byRowKey []*row 1278 1279func (b byRowKey) Len() int { return len(b) } 1280func (b byRowKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 1281func (b byRowKey) Less(i, j int) bool { return b[i].key < b[j].key } 1282 1283type row struct { 1284 key string 1285 1286 mu sync.Mutex 1287 families map[string]*family // keyed by family name 1288} 1289 1290func newRow(key string) *row { 1291 return &row{ 1292 key: key, 1293 families: make(map[string]*family), 1294 } 1295} 1296 1297// copy returns a copy of the row. 1298// Cell values are aliased. 1299// r.mu should be held. 1300func (r *row) copy() *row { 1301 nr := newRow(r.key) 1302 for _, fam := range r.families { 1303 nr.families[fam.name] = &family{ 1304 name: fam.name, 1305 order: fam.order, 1306 colNames: fam.colNames, 1307 cells: make(map[string][]cell), 1308 } 1309 for col, cs := range fam.cells { 1310 // Copy the []cell slice, but not the []byte inside each cell. 1311 nr.families[fam.name].cells[col] = append([]cell(nil), cs...) 1312 } 1313 } 1314 return nr 1315} 1316 1317// isEmpty returns true if a row doesn't contain any cell 1318func (r *row) isEmpty() bool { 1319 for _, fam := range r.families { 1320 for _, cs := range fam.cells { 1321 if len(cs) > 0 { 1322 return false 1323 } 1324 } 1325 } 1326 return true 1327} 1328 1329// sortedFamilies returns a column family set 1330// sorted in ascending creation order in a row. 1331func (r *row) sortedFamilies() []*family { 1332 var families []*family 1333 for _, fam := range r.families { 1334 families = append(families, fam) 1335 } 1336 sort.Sort(byCreationOrder(families)) 1337 return families 1338} 1339 1340func (r *row) getOrCreateFamily(name string, order uint64) *family { 1341 if _, ok := r.families[name]; !ok { 1342 r.families[name] = &family{ 1343 name: name, 1344 order: order, 1345 cells: make(map[string][]cell), 1346 } 1347 } 1348 return r.families[name] 1349} 1350 1351// gc applies the given GC rules to the row. 1352// r.mu should be held. 1353func (r *row) gc(rules map[string]*btapb.GcRule) { 1354 for _, fam := range r.families { 1355 rule, ok := rules[fam.name] 1356 if !ok { 1357 continue 1358 } 1359 for col, cs := range fam.cells { 1360 r.families[fam.name].cells[col] = applyGC(cs, rule) 1361 } 1362 } 1363} 1364 1365// size returns the total size of all cell values in the row. 1366func (r *row) size() int { 1367 size := 0 1368 for _, fam := range r.families { 1369 for _, cells := range fam.cells { 1370 for _, cell := range cells { 1371 size += len(cell.value) 1372 } 1373 } 1374 } 1375 return size 1376} 1377 1378// Less implements btree.Less. 1379func (r *row) Less(i btree.Item) bool { 1380 return r.key < i.(*row).key 1381} 1382 1383// btreeKey returns a row for use as a key into the BTree. 1384func btreeKey(s string) *row { return &row{key: s} } 1385 1386func (r *row) String() string { 1387 return r.key 1388} 1389 1390var gcTypeWarn sync.Once 1391 1392// applyGC applies the given GC rule to the cells. 1393func applyGC(cells []cell, rule *btapb.GcRule) []cell { 1394 switch rule := rule.Rule.(type) { 1395 default: 1396 // TODO(dsymonds): Support GcRule_Intersection_ 1397 gcTypeWarn.Do(func() { 1398 log.Printf("Unsupported GC rule type %T", rule) 1399 }) 1400 case *btapb.GcRule_Union_: 1401 for _, sub := range rule.Union.Rules { 1402 cells = applyGC(cells, sub) 1403 } 1404 return cells 1405 case *btapb.GcRule_MaxAge: 1406 // Timestamps are in microseconds. 1407 cutoff := time.Now().UnixNano() / 1e3 1408 cutoff -= rule.MaxAge.Seconds * 1e6 1409 cutoff -= int64(rule.MaxAge.Nanos) / 1e3 1410 // The slice of cells in in descending timestamp order. 1411 // This sort.Search will return the index of the first cell whose timestamp is chronologically before the cutoff. 1412 si := sort.Search(len(cells), func(i int) bool { return cells[i].ts < cutoff }) 1413 if si < len(cells) { 1414 log.Printf("bttest: GC MaxAge(%v) deleted %d cells.", rule.MaxAge, len(cells)-si) 1415 } 1416 return cells[:si] 1417 case *btapb.GcRule_MaxNumVersions: 1418 n := int(rule.MaxNumVersions) 1419 if len(cells) > n { 1420 cells = cells[:n] 1421 } 1422 return cells 1423 } 1424 return cells 1425} 1426 1427type family struct { 1428 name string // Column family name 1429 order uint64 // Creation order of column family 1430 colNames []string // Column names are sorted in lexicographical ascending order 1431 cells map[string][]cell // Keyed by column name; cells are in descending timestamp order 1432} 1433 1434type byCreationOrder []*family 1435 1436func (b byCreationOrder) Len() int { return len(b) } 1437func (b byCreationOrder) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 1438func (b byCreationOrder) Less(i, j int) bool { return b[i].order < b[j].order } 1439 1440// cellsByColumn adds the column name to colNames set if it does not exist 1441// and returns all cells within a column 1442func (f *family) cellsByColumn(name string) []cell { 1443 if _, ok := f.cells[name]; !ok { 1444 f.colNames = append(f.colNames, name) 1445 sort.Strings(f.colNames) 1446 } 1447 return f.cells[name] 1448} 1449 1450type cell struct { 1451 ts int64 1452 value []byte 1453 labels []string 1454} 1455 1456type byDescTS []cell 1457 1458func (b byDescTS) Len() int { return len(b) } 1459func (b byDescTS) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 1460func (b byDescTS) Less(i, j int) bool { return b[i].ts > b[j].ts } 1461 1462type columnFamily struct { 1463 name string 1464 order uint64 // Creation order of column family 1465 gcRule *btapb.GcRule 1466} 1467 1468func (c *columnFamily) proto() *btapb.ColumnFamily { 1469 return &btapb.ColumnFamily{ 1470 GcRule: c.gcRule, 1471 } 1472} 1473 1474func toColumnFamilies(families map[string]*columnFamily) map[string]*btapb.ColumnFamily { 1475 fs := make(map[string]*btapb.ColumnFamily) 1476 for k, v := range families { 1477 fs[k] = v.proto() 1478 } 1479 return fs 1480} 1481