1/* 2Copyright 2015 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17/* 18Package bttest contains test helpers for working with the bigtable package. 19 20To use a Server, create it, and then connect to it with no security: 21(The project/instance values are ignored.) 22 srv, err := bttest.NewServer("localhost:0") 23 ... 24 conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) 25 ... 26 client, err := bigtable.NewClient(ctx, proj, instance, 27 option.WithGRPCConn(conn)) 28 ... 29*/ 30package bttest // import "cloud.google.com/go/bigtable/bttest" 31 32import ( 33 "bytes" 34 "context" 35 "encoding/binary" 36 "fmt" 37 "log" 38 "math/rand" 39 "net" 40 "regexp" 41 "sort" 42 "strings" 43 "sync" 44 "time" 45 46 emptypb "github.com/golang/protobuf/ptypes/empty" 47 "github.com/golang/protobuf/ptypes/wrappers" 48 "github.com/google/btree" 49 btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2" 50 btpb "google.golang.org/genproto/googleapis/bigtable/v2" 51 "google.golang.org/genproto/googleapis/longrunning" 52 statpb "google.golang.org/genproto/googleapis/rpc/status" 53 "google.golang.org/grpc" 54 "google.golang.org/grpc/codes" 55 "google.golang.org/grpc/status" 56 "rsc.io/binaryregexp" 57) 58 59const ( 60 // MilliSeconds field of the minimum valid Timestamp. 61 minValidMilliSeconds = 0 62 63 // MilliSeconds field of the max valid Timestamp. 64 maxValidMilliSeconds = int64(time.Millisecond) * 253402300800 65) 66 67var ( 68 validLabelTransformer = regexp.MustCompile(`[a-z0-9\-]{1,15}`) 69) 70 71// Server is an in-memory Cloud Bigtable fake. 72// It is unauthenticated, and only a rough approximation. 73type Server struct { 74 Addr string 75 76 l net.Listener 77 srv *grpc.Server 78 s *server 79} 80 81// server is the real implementation of the fake. 82// It is a separate and unexported type so the API won't be cluttered with 83// methods that are only relevant to the fake's implementation. 84type server struct { 85 mu sync.Mutex 86 tables map[string]*table // keyed by fully qualified name 87 instances map[string]*btapb.Instance // keyed by fully qualified name 88 gcc chan int // set when gcloop starts, closed when server shuts down 89 90 // Any unimplemented methods will cause a panic. 91 btapb.BigtableTableAdminServer 92 btapb.BigtableInstanceAdminServer 93 btpb.BigtableServer 94} 95 96// NewServer creates a new Server. 97// The Server will be listening for gRPC connections, without TLS, 98// on the provided address. The resolved address is named by the Addr field. 99func NewServer(laddr string, opt ...grpc.ServerOption) (*Server, error) { 100 l, err := net.Listen("tcp", laddr) 101 if err != nil { 102 return nil, err 103 } 104 105 s := &Server{ 106 Addr: l.Addr().String(), 107 l: l, 108 srv: grpc.NewServer(opt...), 109 s: &server{ 110 tables: make(map[string]*table), 111 instances: make(map[string]*btapb.Instance), 112 }, 113 } 114 btapb.RegisterBigtableInstanceAdminServer(s.srv, s.s) 115 btapb.RegisterBigtableTableAdminServer(s.srv, s.s) 116 btpb.RegisterBigtableServer(s.srv, s.s) 117 118 go s.srv.Serve(s.l) 119 120 return s, nil 121} 122 123// Close shuts down the server. 124func (s *Server) Close() { 125 s.s.mu.Lock() 126 if s.s.gcc != nil { 127 close(s.s.gcc) 128 } 129 s.s.mu.Unlock() 130 131 s.srv.Stop() 132 s.l.Close() 133} 134 135func (s *server) CreateTable(ctx context.Context, req *btapb.CreateTableRequest) (*btapb.Table, error) { 136 tbl := req.Parent + "/tables/" + req.TableId 137 138 s.mu.Lock() 139 if _, ok := s.tables[tbl]; ok { 140 s.mu.Unlock() 141 return nil, status.Errorf(codes.AlreadyExists, "table %q already exists", tbl) 142 } 143 s.tables[tbl] = newTable(req) 144 s.mu.Unlock() 145 146 ct := &btapb.Table{ 147 Name: tbl, 148 ColumnFamilies: req.GetTable().GetColumnFamilies(), 149 Granularity: req.GetTable().GetGranularity(), 150 } 151 if ct.Granularity == 0 { 152 ct.Granularity = btapb.Table_MILLIS 153 } 154 return ct, nil 155} 156 157func (s *server) CreateTableFromSnapshot(context.Context, *btapb.CreateTableFromSnapshotRequest) (*longrunning.Operation, error) { 158 return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots") 159} 160 161func (s *server) ListTables(ctx context.Context, req *btapb.ListTablesRequest) (*btapb.ListTablesResponse, error) { 162 res := &btapb.ListTablesResponse{} 163 prefix := req.Parent + "/tables/" 164 165 s.mu.Lock() 166 for tbl := range s.tables { 167 if strings.HasPrefix(tbl, prefix) { 168 res.Tables = append(res.Tables, &btapb.Table{Name: tbl}) 169 } 170 } 171 s.mu.Unlock() 172 173 return res, nil 174} 175 176func (s *server) GetTable(ctx context.Context, req *btapb.GetTableRequest) (*btapb.Table, error) { 177 tbl := req.Name 178 179 s.mu.Lock() 180 tblIns, ok := s.tables[tbl] 181 s.mu.Unlock() 182 if !ok { 183 return nil, status.Errorf(codes.NotFound, "table %q not found", tbl) 184 } 185 186 return &btapb.Table{ 187 Name: tbl, 188 ColumnFamilies: toColumnFamilies(tblIns.columnFamilies()), 189 }, nil 190} 191 192func (s *server) DeleteTable(ctx context.Context, req *btapb.DeleteTableRequest) (*emptypb.Empty, error) { 193 s.mu.Lock() 194 defer s.mu.Unlock() 195 if _, ok := s.tables[req.Name]; !ok { 196 return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name) 197 } 198 delete(s.tables, req.Name) 199 return &emptypb.Empty{}, nil 200} 201 202func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColumnFamiliesRequest) (*btapb.Table, error) { 203 s.mu.Lock() 204 tbl, ok := s.tables[req.Name] 205 s.mu.Unlock() 206 if !ok { 207 return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name) 208 } 209 210 tbl.mu.Lock() 211 defer tbl.mu.Unlock() 212 213 for _, mod := range req.Modifications { 214 if create := mod.GetCreate(); create != nil { 215 if _, ok := tbl.families[mod.Id]; ok { 216 return nil, status.Errorf(codes.AlreadyExists, "family %q already exists", mod.Id) 217 } 218 newcf := &columnFamily{ 219 name: req.Name + "/columnFamilies/" + mod.Id, 220 order: tbl.counter, 221 gcRule: create.GcRule, 222 } 223 tbl.counter++ 224 tbl.families[mod.Id] = newcf 225 } else if mod.GetDrop() { 226 if _, ok := tbl.families[mod.Id]; !ok { 227 return nil, fmt.Errorf("can't delete unknown family %q", mod.Id) 228 } 229 delete(tbl.families, mod.Id) 230 } else if modify := mod.GetUpdate(); modify != nil { 231 if _, ok := tbl.families[mod.Id]; !ok { 232 return nil, fmt.Errorf("no such family %q", mod.Id) 233 } 234 newcf := &columnFamily{ 235 name: req.Name + "/columnFamilies/" + mod.Id, 236 gcRule: modify.GcRule, 237 } 238 // assume that we ALWAYS want to replace by the new setting 239 // we may need partial update through 240 tbl.families[mod.Id] = newcf 241 } 242 } 243 244 s.needGC() 245 return &btapb.Table{ 246 Name: req.Name, 247 ColumnFamilies: toColumnFamilies(tbl.families), 248 Granularity: btapb.Table_TimestampGranularity(btapb.Table_MILLIS), 249 }, nil 250} 251 252func (s *server) DropRowRange(ctx context.Context, req *btapb.DropRowRangeRequest) (*emptypb.Empty, error) { 253 s.mu.Lock() 254 defer s.mu.Unlock() 255 tbl, ok := s.tables[req.Name] 256 if !ok { 257 return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name) 258 } 259 260 if req.GetDeleteAllDataFromTable() { 261 tbl.rows = btree.New(btreeDegree) 262 } else { 263 // Delete rows by prefix. 264 prefixBytes := req.GetRowKeyPrefix() 265 if prefixBytes == nil { 266 return nil, fmt.Errorf("missing row key prefix") 267 } 268 prefix := string(prefixBytes) 269 270 // The BTree does not specify what happens if rows are deleted during 271 // iteration, and it provides no "delete range" method. 272 // So we collect the rows first, then delete them one by one. 273 var rowsToDelete []*row 274 tbl.rows.AscendGreaterOrEqual(btreeKey(prefix), func(i btree.Item) bool { 275 r := i.(*row) 276 if strings.HasPrefix(r.key, prefix) { 277 rowsToDelete = append(rowsToDelete, r) 278 return true 279 } 280 return false // stop iteration 281 }) 282 for _, r := range rowsToDelete { 283 tbl.rows.Delete(r) 284 } 285 } 286 return &emptypb.Empty{}, nil 287} 288 289func (s *server) GenerateConsistencyToken(ctx context.Context, req *btapb.GenerateConsistencyTokenRequest) (*btapb.GenerateConsistencyTokenResponse, error) { 290 // Check that the table exists. 291 _, ok := s.tables[req.Name] 292 if !ok { 293 return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name) 294 } 295 296 return &btapb.GenerateConsistencyTokenResponse{ 297 ConsistencyToken: "TokenFor-" + req.Name, 298 }, nil 299} 300 301func (s *server) CheckConsistency(ctx context.Context, req *btapb.CheckConsistencyRequest) (*btapb.CheckConsistencyResponse, error) { 302 // Check that the table exists. 303 _, ok := s.tables[req.Name] 304 if !ok { 305 return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name) 306 } 307 308 // Check this is the right token. 309 if req.ConsistencyToken != "TokenFor-"+req.Name { 310 return nil, status.Errorf(codes.InvalidArgument, "token %q not valid", req.ConsistencyToken) 311 } 312 313 // Single cluster instances are always consistent. 314 return &btapb.CheckConsistencyResponse{ 315 Consistent: true, 316 }, nil 317} 318 319func (s *server) SnapshotTable(context.Context, *btapb.SnapshotTableRequest) (*longrunning.Operation, error) { 320 return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots") 321} 322 323func (s *server) GetSnapshot(context.Context, *btapb.GetSnapshotRequest) (*btapb.Snapshot, error) { 324 return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots") 325} 326func (s *server) ListSnapshots(context.Context, *btapb.ListSnapshotsRequest) (*btapb.ListSnapshotsResponse, error) { 327 return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots") 328} 329func (s *server) DeleteSnapshot(context.Context, *btapb.DeleteSnapshotRequest) (*emptypb.Empty, error) { 330 return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots") 331} 332 333func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRowsServer) error { 334 s.mu.Lock() 335 tbl, ok := s.tables[req.TableName] 336 s.mu.Unlock() 337 if !ok { 338 return status.Errorf(codes.NotFound, "table %q not found", req.TableName) 339 } 340 341 if err := validateRowRanges(req); err != nil { 342 return err 343 } 344 345 // Rows to read can be specified by a set of row keys and/or a set of row ranges. 346 // Output is a stream of sorted, de-duped rows. 347 tbl.mu.RLock() 348 rowSet := make(map[string]*row) 349 350 addRow := func(i btree.Item) bool { 351 r := i.(*row) 352 rowSet[r.key] = r 353 return true 354 } 355 356 if req.Rows != nil && 357 len(req.Rows.RowKeys)+len(req.Rows.RowRanges) > 0 { 358 // Add the explicitly given keys 359 for _, key := range req.Rows.RowKeys { 360 k := string(key) 361 if i := tbl.rows.Get(btreeKey(k)); i != nil { 362 addRow(i) 363 } 364 } 365 366 // Add keys from row ranges 367 for _, rr := range req.Rows.RowRanges { 368 var start, end string 369 switch sk := rr.StartKey.(type) { 370 case *btpb.RowRange_StartKeyClosed: 371 start = string(sk.StartKeyClosed) 372 case *btpb.RowRange_StartKeyOpen: 373 start = string(sk.StartKeyOpen) + "\x00" 374 } 375 switch ek := rr.EndKey.(type) { 376 case *btpb.RowRange_EndKeyClosed: 377 end = string(ek.EndKeyClosed) + "\x00" 378 case *btpb.RowRange_EndKeyOpen: 379 end = string(ek.EndKeyOpen) 380 } 381 switch { 382 case start == "" && end == "": 383 tbl.rows.Ascend(addRow) // all rows 384 case start == "": 385 tbl.rows.AscendLessThan(btreeKey(end), addRow) 386 case end == "": 387 tbl.rows.AscendGreaterOrEqual(btreeKey(start), addRow) 388 default: 389 tbl.rows.AscendRange(btreeKey(start), btreeKey(end), addRow) 390 } 391 } 392 } else { 393 // Read all rows 394 tbl.rows.Ascend(addRow) 395 } 396 tbl.mu.RUnlock() 397 398 rows := make([]*row, 0, len(rowSet)) 399 for _, r := range rowSet { 400 r.mu.Lock() 401 fams := len(r.families) 402 r.mu.Unlock() 403 404 if fams != 0 { 405 rows = append(rows, r) 406 } 407 } 408 sort.Sort(byRowKey(rows)) 409 410 limit := int(req.RowsLimit) 411 count := 0 412 for _, r := range rows { 413 if limit > 0 && count >= limit { 414 return nil 415 } 416 streamed, err := streamRow(stream, r, req.Filter) 417 if err != nil { 418 return err 419 } 420 if streamed { 421 count++ 422 } 423 } 424 return nil 425} 426 427// streamRow filters the given row and sends it via the given stream. 428// Returns true if at least one cell matched the filter and was streamed, false otherwise. 429func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter) (bool, error) { 430 r.mu.Lock() 431 nr := r.copy() 432 r.mu.Unlock() 433 r = nr 434 435 match, err := filterRow(f, r) 436 if err != nil { 437 return false, err 438 } 439 if !match { 440 return false, nil 441 } 442 443 rrr := &btpb.ReadRowsResponse{} 444 families := r.sortedFamilies() 445 for _, fam := range families { 446 for _, colName := range fam.colNames { 447 cells := fam.cells[colName] 448 if len(cells) == 0 { 449 continue 450 } 451 for _, cell := range cells { 452 rrr.Chunks = append(rrr.Chunks, &btpb.ReadRowsResponse_CellChunk{ 453 RowKey: []byte(r.key), 454 FamilyName: &wrappers.StringValue{Value: fam.name}, 455 Qualifier: &wrappers.BytesValue{Value: []byte(colName)}, 456 TimestampMicros: cell.ts, 457 Value: cell.value, 458 Labels: cell.labels, 459 }) 460 } 461 } 462 } 463 // We can't have a cell with just COMMIT set, which would imply a new empty cell. 464 // So modify the last cell to have the COMMIT flag set. 465 if len(rrr.Chunks) > 0 { 466 rrr.Chunks[len(rrr.Chunks)-1].RowStatus = &btpb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: true} 467 } 468 469 return true, stream.Send(rrr) 470} 471 472// filterRow modifies a row with the given filter. Returns true if at least one cell from the row matches, 473// false otherwise. If a filter is invalid, filterRow returns false and an error. 474func filterRow(f *btpb.RowFilter, r *row) (bool, error) { 475 if f == nil { 476 return true, nil 477 } 478 // Handle filters that apply beyond just including/excluding cells. 479 switch f := f.Filter.(type) { 480 case *btpb.RowFilter_BlockAllFilter: 481 return !f.BlockAllFilter, nil 482 case *btpb.RowFilter_PassAllFilter: 483 return f.PassAllFilter, nil 484 case *btpb.RowFilter_Chain_: 485 for _, sub := range f.Chain.Filters { 486 match, err := filterRow(sub, r) 487 if err != nil { 488 return false, err 489 } 490 if !match { 491 return false, nil 492 } 493 } 494 return true, nil 495 case *btpb.RowFilter_Interleave_: 496 srs := make([]*row, 0, len(f.Interleave.Filters)) 497 for _, sub := range f.Interleave.Filters { 498 sr := r.copy() 499 match, err := filterRow(sub, sr) 500 if err != nil { 501 return false, err 502 } 503 if match { 504 srs = append(srs, sr) 505 } 506 } 507 // merge 508 // TODO(dsymonds): is this correct? 509 r.families = make(map[string]*family) 510 for _, sr := range srs { 511 for _, fam := range sr.families { 512 f := r.getOrCreateFamily(fam.name, fam.order) 513 for colName, cs := range fam.cells { 514 f.cells[colName] = append(f.cellsByColumn(colName), cs...) 515 } 516 } 517 } 518 var count int 519 for _, fam := range r.families { 520 for _, cs := range fam.cells { 521 sort.Sort(byDescTS(cs)) 522 count += len(cs) 523 } 524 } 525 return count > 0, nil 526 case *btpb.RowFilter_CellsPerColumnLimitFilter: 527 lim := int(f.CellsPerColumnLimitFilter) 528 for _, fam := range r.families { 529 for col, cs := range fam.cells { 530 if len(cs) > lim { 531 fam.cells[col] = cs[:lim] 532 } 533 } 534 } 535 return true, nil 536 case *btpb.RowFilter_Condition_: 537 match, err := filterRow(f.Condition.PredicateFilter, r.copy()) 538 if err != nil { 539 return false, err 540 } 541 if match { 542 if f.Condition.TrueFilter == nil { 543 return false, nil 544 } 545 return filterRow(f.Condition.TrueFilter, r) 546 } 547 if f.Condition.FalseFilter == nil { 548 return false, nil 549 } 550 return filterRow(f.Condition.FalseFilter, r) 551 case *btpb.RowFilter_RowKeyRegexFilter: 552 rx, err := newRegexp(f.RowKeyRegexFilter) 553 if err != nil { 554 return false, status.Errorf(codes.InvalidArgument, "Error in field 'rowkey_regex_filter' : %v", err) 555 } 556 if !rx.MatchString(r.key) { 557 return false, nil 558 } 559 case *btpb.RowFilter_CellsPerRowLimitFilter: 560 // Grab the first n cells in the row. 561 lim := int(f.CellsPerRowLimitFilter) 562 for _, fam := range r.families { 563 for _, col := range fam.colNames { 564 cs := fam.cells[col] 565 if len(cs) > lim { 566 fam.cells[col] = cs[:lim] 567 lim = 0 568 } else { 569 lim -= len(cs) 570 } 571 } 572 } 573 return true, nil 574 case *btpb.RowFilter_CellsPerRowOffsetFilter: 575 // Skip the first n cells in the row. 576 offset := int(f.CellsPerRowOffsetFilter) 577 for _, fam := range r.families { 578 for _, col := range fam.colNames { 579 cs := fam.cells[col] 580 if len(cs) > offset { 581 fam.cells[col] = cs[offset:] 582 offset = 0 583 return true, nil 584 } 585 fam.cells[col] = cs[:0] 586 offset -= len(cs) 587 } 588 } 589 return true, nil 590 case *btpb.RowFilter_RowSampleFilter: 591 // The row sample filter "matches all cells from a row with probability 592 // p, and matches no cells from the row with probability 1-p." 593 // See https://github.com/googleapis/googleapis/blob/master/google/bigtable/v2/data.proto 594 if f.RowSampleFilter <= 0.0 || f.RowSampleFilter >= 1.0 { 595 return false, status.Error(codes.InvalidArgument, "row_sample_filter argument must be between 0.0 and 1.0") 596 } 597 return randFloat() < f.RowSampleFilter, nil 598 } 599 600 // Any other case, operate on a per-cell basis. 601 cellCount := 0 602 for _, fam := range r.families { 603 for colName, cs := range fam.cells { 604 filtered, err := filterCells(f, fam.name, colName, cs) 605 if err != nil { 606 return false, err 607 } 608 fam.cells[colName] = filtered 609 cellCount += len(fam.cells[colName]) 610 } 611 } 612 return cellCount > 0, nil 613} 614 615var randFloat = rand.Float64 616 617func filterCells(f *btpb.RowFilter, fam, col string, cs []cell) ([]cell, error) { 618 var ret []cell 619 for _, cell := range cs { 620 include, err := includeCell(f, fam, col, cell) 621 if err != nil { 622 return nil, err 623 } 624 if include { 625 cell, err = modifyCell(f, cell) 626 if err != nil { 627 return nil, err 628 } 629 ret = append(ret, cell) 630 } 631 } 632 return ret, nil 633} 634 635func modifyCell(f *btpb.RowFilter, c cell) (cell, error) { 636 if f == nil { 637 return c, nil 638 } 639 // Consider filters that may modify the cell contents 640 switch filter := f.Filter.(type) { 641 case *btpb.RowFilter_StripValueTransformer: 642 return cell{ts: c.ts}, nil 643 case *btpb.RowFilter_ApplyLabelTransformer: 644 if !validLabelTransformer.MatchString(filter.ApplyLabelTransformer) { 645 return cell{}, status.Errorf( 646 codes.InvalidArgument, 647 `apply_label_transformer must match RE2([a-z0-9\-]+), but found %v`, 648 filter.ApplyLabelTransformer, 649 ) 650 } 651 return cell{ts: c.ts, value: c.value, labels: []string{filter.ApplyLabelTransformer}}, nil 652 default: 653 return c, nil 654 } 655} 656 657func includeCell(f *btpb.RowFilter, fam, col string, cell cell) (bool, error) { 658 if f == nil { 659 return true, nil 660 } 661 // TODO(dsymonds): Implement many more filters. 662 switch f := f.Filter.(type) { 663 case *btpb.RowFilter_CellsPerColumnLimitFilter: 664 // Don't log, row-level filter 665 return true, nil 666 case *btpb.RowFilter_RowKeyRegexFilter: 667 // Don't log, row-level filter 668 return true, nil 669 case *btpb.RowFilter_StripValueTransformer: 670 // Don't log, cell-modifying filter 671 return true, nil 672 case *btpb.RowFilter_ApplyLabelTransformer: 673 // Don't log, cell-modifying filter 674 return true, nil 675 default: 676 log.Printf("WARNING: don't know how to handle filter of type %T (ignoring it)", f) 677 return true, nil 678 case *btpb.RowFilter_FamilyNameRegexFilter: 679 rx, err := newRegexp([]byte(f.FamilyNameRegexFilter)) 680 if err != nil { 681 return false, status.Errorf(codes.InvalidArgument, "Error in field 'family_name_regex_filter' : %v", err) 682 } 683 return rx.MatchString(fam), nil 684 case *btpb.RowFilter_ColumnQualifierRegexFilter: 685 rx, err := newRegexp(f.ColumnQualifierRegexFilter) 686 if err != nil { 687 return false, status.Errorf(codes.InvalidArgument, "Error in field 'column_qualifier_regex_filter' : %v", err) 688 } 689 return rx.MatchString(col), nil 690 case *btpb.RowFilter_ValueRegexFilter: 691 rx, err := newRegexp(f.ValueRegexFilter) 692 if err != nil { 693 return false, status.Errorf(codes.InvalidArgument, "Error in field 'value_regex_filter' : %v", err) 694 } 695 return rx.Match(cell.value), nil 696 case *btpb.RowFilter_ColumnRangeFilter: 697 if fam != f.ColumnRangeFilter.FamilyName { 698 return false, nil 699 } 700 // Start qualifier defaults to empty string closed 701 inRangeStart := func() bool { return col >= "" } 702 switch sq := f.ColumnRangeFilter.StartQualifier.(type) { 703 case *btpb.ColumnRange_StartQualifierOpen: 704 inRangeStart = func() bool { return col > string(sq.StartQualifierOpen) } 705 case *btpb.ColumnRange_StartQualifierClosed: 706 inRangeStart = func() bool { return col >= string(sq.StartQualifierClosed) } 707 } 708 // End qualifier defaults to no upper boundary 709 inRangeEnd := func() bool { return true } 710 switch eq := f.ColumnRangeFilter.EndQualifier.(type) { 711 case *btpb.ColumnRange_EndQualifierClosed: 712 inRangeEnd = func() bool { return col <= string(eq.EndQualifierClosed) } 713 case *btpb.ColumnRange_EndQualifierOpen: 714 inRangeEnd = func() bool { return col < string(eq.EndQualifierOpen) } 715 } 716 return inRangeStart() && inRangeEnd(), nil 717 case *btpb.RowFilter_TimestampRangeFilter: 718 // Server should only support millisecond precision. 719 if f.TimestampRangeFilter.StartTimestampMicros%int64(time.Millisecond/time.Microsecond) != 0 || f.TimestampRangeFilter.EndTimestampMicros%int64(time.Millisecond/time.Microsecond) != 0 { 720 return false, status.Errorf(codes.InvalidArgument, "Error in field 'timestamp_range_filter'. Maximum precision allowed in filter is millisecond.\nGot:\nStart: %v\nEnd: %v", f.TimestampRangeFilter.StartTimestampMicros, f.TimestampRangeFilter.EndTimestampMicros) 721 } 722 // Lower bound is inclusive and defaults to 0, upper bound is exclusive and defaults to infinity. 723 return cell.ts >= f.TimestampRangeFilter.StartTimestampMicros && 724 (f.TimestampRangeFilter.EndTimestampMicros == 0 || cell.ts < f.TimestampRangeFilter.EndTimestampMicros), nil 725 case *btpb.RowFilter_ValueRangeFilter: 726 v := cell.value 727 // Start value defaults to empty string closed 728 inRangeStart := func() bool { return bytes.Compare(v, []byte{}) >= 0 } 729 switch sv := f.ValueRangeFilter.StartValue.(type) { 730 case *btpb.ValueRange_StartValueOpen: 731 inRangeStart = func() bool { return bytes.Compare(v, sv.StartValueOpen) > 0 } 732 case *btpb.ValueRange_StartValueClosed: 733 inRangeStart = func() bool { return bytes.Compare(v, sv.StartValueClosed) >= 0 } 734 } 735 // End value defaults to no upper boundary 736 inRangeEnd := func() bool { return true } 737 switch ev := f.ValueRangeFilter.EndValue.(type) { 738 case *btpb.ValueRange_EndValueClosed: 739 inRangeEnd = func() bool { return bytes.Compare(v, ev.EndValueClosed) <= 0 } 740 case *btpb.ValueRange_EndValueOpen: 741 inRangeEnd = func() bool { return bytes.Compare(v, ev.EndValueOpen) < 0 } 742 } 743 return inRangeStart() && inRangeEnd(), nil 744 } 745} 746 747func newRegexp(pat []byte) (*binaryregexp.Regexp, error) { 748 re, err := binaryregexp.Compile("^(?:" + string(pat) + ")$") // match entire target 749 if err != nil { 750 log.Printf("Bad pattern %q: %v", pat, err) 751 } 752 return re, err 753} 754 755func (s *server) MutateRow(ctx context.Context, req *btpb.MutateRowRequest) (*btpb.MutateRowResponse, error) { 756 s.mu.Lock() 757 tbl, ok := s.tables[req.TableName] 758 s.mu.Unlock() 759 if !ok { 760 return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName) 761 } 762 fs := tbl.columnFamilies() 763 r := tbl.mutableRow(string(req.RowKey)) 764 r.mu.Lock() 765 defer r.mu.Unlock() 766 if err := applyMutations(tbl, r, req.Mutations, fs); err != nil { 767 return nil, err 768 } 769 return &btpb.MutateRowResponse{}, nil 770} 771 772func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_MutateRowsServer) error { 773 s.mu.Lock() 774 tbl, ok := s.tables[req.TableName] 775 s.mu.Unlock() 776 if !ok { 777 return status.Errorf(codes.NotFound, "table %q not found", req.TableName) 778 } 779 res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))} 780 781 fs := tbl.columnFamilies() 782 783 for i, entry := range req.Entries { 784 r := tbl.mutableRow(string(entry.RowKey)) 785 r.mu.Lock() 786 code, msg := int32(codes.OK), "" 787 if err := applyMutations(tbl, r, entry.Mutations, fs); err != nil { 788 code = int32(codes.Internal) 789 msg = err.Error() 790 } 791 res.Entries[i] = &btpb.MutateRowsResponse_Entry{ 792 Index: int64(i), 793 Status: &statpb.Status{Code: code, Message: msg}, 794 } 795 r.mu.Unlock() 796 } 797 return stream.Send(res) 798} 799 800func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutateRowRequest) (*btpb.CheckAndMutateRowResponse, error) { 801 s.mu.Lock() 802 tbl, ok := s.tables[req.TableName] 803 s.mu.Unlock() 804 if !ok { 805 return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName) 806 } 807 res := &btpb.CheckAndMutateRowResponse{} 808 809 fs := tbl.columnFamilies() 810 811 r := tbl.mutableRow(string(req.RowKey)) 812 r.mu.Lock() 813 defer r.mu.Unlock() 814 815 // Figure out which mutation to apply. 816 whichMut := false 817 if req.PredicateFilter == nil { 818 // Use true_mutations iff row contains any cells. 819 whichMut = !r.isEmpty() 820 } else { 821 // Use true_mutations iff any cells in the row match the filter. 822 // TODO(dsymonds): This could be cheaper. 823 nr := r.copy() 824 825 match, err := filterRow(req.PredicateFilter, nr) 826 if err != nil { 827 return nil, err 828 } 829 whichMut = match && !nr.isEmpty() 830 } 831 res.PredicateMatched = whichMut 832 muts := req.FalseMutations 833 if whichMut { 834 muts = req.TrueMutations 835 } 836 837 if err := applyMutations(tbl, r, muts, fs); err != nil { 838 return nil, err 839 } 840 return res, nil 841} 842 843// applyMutations applies a sequence of mutations to a row. 844// fam should be a snapshot of the keys of tbl.families. 845// It assumes r.mu is locked. 846func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*columnFamily) error { 847 for _, mut := range muts { 848 switch mut := mut.Mutation.(type) { 849 default: 850 return fmt.Errorf("can't handle mutation type %T", mut) 851 case *btpb.Mutation_SetCell_: 852 set := mut.SetCell 853 if _, ok := fs[set.FamilyName]; !ok { 854 return fmt.Errorf("unknown family %q", set.FamilyName) 855 } 856 ts := set.TimestampMicros 857 if ts == -1 { // bigtable.ServerTime 858 ts = newTimestamp() 859 } 860 if !tbl.validTimestamp(ts) { 861 return fmt.Errorf("invalid timestamp %d", ts) 862 } 863 fam := set.FamilyName 864 col := string(set.ColumnQualifier) 865 866 newCell := cell{ts: ts, value: set.Value} 867 f := r.getOrCreateFamily(fam, fs[fam].order) 868 f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell) 869 case *btpb.Mutation_DeleteFromColumn_: 870 del := mut.DeleteFromColumn 871 if _, ok := fs[del.FamilyName]; !ok { 872 return fmt.Errorf("unknown family %q", del.FamilyName) 873 } 874 fam := del.FamilyName 875 col := string(del.ColumnQualifier) 876 if _, ok := r.families[fam]; ok { 877 cs := r.families[fam].cells[col] 878 if del.TimeRange != nil { 879 tsr := del.TimeRange 880 if !tbl.validTimestamp(tsr.StartTimestampMicros) { 881 return fmt.Errorf("invalid timestamp %d", tsr.StartTimestampMicros) 882 } 883 if !tbl.validTimestamp(tsr.EndTimestampMicros) && tsr.EndTimestampMicros != 0 { 884 return fmt.Errorf("invalid timestamp %d", tsr.EndTimestampMicros) 885 } 886 if tsr.StartTimestampMicros >= tsr.EndTimestampMicros && tsr.EndTimestampMicros != 0 { 887 return fmt.Errorf("inverted or invalid timestamp range [%d, %d]", tsr.StartTimestampMicros, tsr.EndTimestampMicros) 888 } 889 890 // Find half-open interval to remove. 891 // Cells are in descending timestamp order, 892 // so the predicates to sort.Search are inverted. 893 si, ei := 0, len(cs) 894 if tsr.StartTimestampMicros > 0 { 895 ei = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.StartTimestampMicros }) 896 } 897 if tsr.EndTimestampMicros > 0 { 898 si = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.EndTimestampMicros }) 899 } 900 if si < ei { 901 copy(cs[si:], cs[ei:]) 902 cs = cs[:len(cs)-(ei-si)] 903 } 904 } else { 905 cs = nil 906 } 907 if len(cs) == 0 { 908 delete(r.families[fam].cells, col) 909 colNames := r.families[fam].colNames 910 i := sort.Search(len(colNames), func(i int) bool { return colNames[i] >= col }) 911 if i < len(colNames) && colNames[i] == col { 912 r.families[fam].colNames = append(colNames[:i], colNames[i+1:]...) 913 } 914 if len(r.families[fam].cells) == 0 { 915 delete(r.families, fam) 916 } 917 } else { 918 r.families[fam].cells[col] = cs 919 } 920 } 921 case *btpb.Mutation_DeleteFromRow_: 922 r.families = make(map[string]*family) 923 case *btpb.Mutation_DeleteFromFamily_: 924 fampre := mut.DeleteFromFamily.FamilyName 925 delete(r.families, fampre) 926 } 927 } 928 return nil 929} 930 931func maxTimestamp(x, y int64) int64 { 932 if x > y { 933 return x 934 } 935 return y 936} 937 938func newTimestamp() int64 { 939 ts := time.Now().UnixNano() / 1e3 940 ts -= ts % 1000 // round to millisecond granularity 941 return ts 942} 943 944func appendOrReplaceCell(cs []cell, newCell cell) []cell { 945 replaced := false 946 for i, cell := range cs { 947 if cell.ts == newCell.ts { 948 cs[i] = newCell 949 replaced = true 950 break 951 } 952 } 953 if !replaced { 954 cs = append(cs, newCell) 955 } 956 sort.Sort(byDescTS(cs)) 957 return cs 958} 959 960func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWriteRowRequest) (*btpb.ReadModifyWriteRowResponse, error) { 961 s.mu.Lock() 962 tbl, ok := s.tables[req.TableName] 963 s.mu.Unlock() 964 if !ok { 965 return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName) 966 } 967 968 fs := tbl.columnFamilies() 969 970 rowKey := string(req.RowKey) 971 r := tbl.mutableRow(rowKey) 972 resultRow := newRow(rowKey) // copy of updated cells 973 974 // This must be done before the row lock, acquired below, is released. 975 r.mu.Lock() 976 defer r.mu.Unlock() 977 // Assume all mutations apply to the most recent version of the cell. 978 // TODO(dsymonds): Verify this assumption and document it in the proto. 979 for _, rule := range req.Rules { 980 if _, ok := fs[rule.FamilyName]; !ok { 981 return nil, fmt.Errorf("unknown family %q", rule.FamilyName) 982 } 983 984 fam := rule.FamilyName 985 col := string(rule.ColumnQualifier) 986 isEmpty := false 987 f := r.getOrCreateFamily(fam, fs[fam].order) 988 cs := f.cells[col] 989 isEmpty = len(cs) == 0 990 991 ts := newTimestamp() 992 var newCell, prevCell cell 993 if !isEmpty { 994 cells := r.families[fam].cells[col] 995 prevCell = cells[0] 996 997 // ts is the max of now or the prev cell's timestamp in case the 998 // prev cell is in the future 999 ts = maxTimestamp(ts, prevCell.ts) 1000 } 1001 1002 switch rule := rule.Rule.(type) { 1003 default: 1004 return nil, fmt.Errorf("unknown RMW rule oneof %T", rule) 1005 case *btpb.ReadModifyWriteRule_AppendValue: 1006 newCell = cell{ts: ts, value: append(prevCell.value, rule.AppendValue...)} 1007 case *btpb.ReadModifyWriteRule_IncrementAmount: 1008 var v int64 1009 if !isEmpty { 1010 prevVal := prevCell.value 1011 if len(prevVal) != 8 { 1012 return nil, fmt.Errorf("increment on non-64-bit value") 1013 } 1014 v = int64(binary.BigEndian.Uint64(prevVal)) 1015 } 1016 v += rule.IncrementAmount 1017 var val [8]byte 1018 binary.BigEndian.PutUint64(val[:], uint64(v)) 1019 newCell = cell{ts: ts, value: val[:]} 1020 } 1021 1022 // Store the new cell 1023 f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell) 1024 1025 // Store a copy for the result row 1026 resultFamily := resultRow.getOrCreateFamily(fam, fs[fam].order) 1027 resultFamily.cellsByColumn(col) // create the column 1028 resultFamily.cells[col] = []cell{newCell} // overwrite the cells 1029 } 1030 1031 // Build the response using the result row 1032 res := &btpb.Row{ 1033 Key: req.RowKey, 1034 Families: make([]*btpb.Family, len(resultRow.families)), 1035 } 1036 1037 for i, family := range resultRow.sortedFamilies() { 1038 res.Families[i] = &btpb.Family{ 1039 Name: family.name, 1040 Columns: make([]*btpb.Column, len(family.colNames)), 1041 } 1042 1043 for j, colName := range family.colNames { 1044 res.Families[i].Columns[j] = &btpb.Column{ 1045 Qualifier: []byte(colName), 1046 Cells: []*btpb.Cell{{ 1047 TimestampMicros: family.cells[colName][0].ts, 1048 Value: family.cells[colName][0].value, 1049 }}, 1050 } 1051 } 1052 } 1053 return &btpb.ReadModifyWriteRowResponse{Row: res}, nil 1054} 1055 1056func (s *server) SampleRowKeys(req *btpb.SampleRowKeysRequest, stream btpb.Bigtable_SampleRowKeysServer) error { 1057 s.mu.Lock() 1058 tbl, ok := s.tables[req.TableName] 1059 s.mu.Unlock() 1060 if !ok { 1061 return status.Errorf(codes.NotFound, "table %q not found", req.TableName) 1062 } 1063 1064 tbl.mu.RLock() 1065 defer tbl.mu.RUnlock() 1066 1067 // The return value of SampleRowKeys is very loosely defined. Return at least the 1068 // final row key in the table and choose other row keys randomly. 1069 var offset int64 1070 var err error 1071 i := 0 1072 tbl.rows.Ascend(func(it btree.Item) bool { 1073 row := it.(*row) 1074 if i == tbl.rows.Len()-1 || rand.Int31n(100) == 0 { 1075 resp := &btpb.SampleRowKeysResponse{ 1076 RowKey: []byte(row.key), 1077 OffsetBytes: offset, 1078 } 1079 err = stream.Send(resp) 1080 if err != nil { 1081 return false 1082 } 1083 } 1084 offset += int64(row.size()) 1085 i++ 1086 return true 1087 }) 1088 return err 1089} 1090 1091// needGC is invoked whenever the server needs gcloop running. 1092func (s *server) needGC() { 1093 s.mu.Lock() 1094 if s.gcc == nil { 1095 s.gcc = make(chan int) 1096 go s.gcloop(s.gcc) 1097 } 1098 s.mu.Unlock() 1099} 1100 1101func (s *server) gcloop(done <-chan int) { 1102 const ( 1103 minWait = 500 // ms 1104 maxWait = 1500 // ms 1105 ) 1106 1107 for { 1108 // Wait for a random time interval. 1109 d := time.Duration(minWait+rand.Intn(maxWait-minWait)) * time.Millisecond 1110 select { 1111 case <-time.After(d): 1112 case <-done: 1113 return // server has been closed 1114 } 1115 1116 // Do a GC pass over all tables. 1117 var tables []*table 1118 s.mu.Lock() 1119 for _, tbl := range s.tables { 1120 tables = append(tables, tbl) 1121 } 1122 s.mu.Unlock() 1123 for _, tbl := range tables { 1124 tbl.gc() 1125 } 1126 } 1127} 1128 1129type table struct { 1130 mu sync.RWMutex 1131 counter uint64 // increment by 1 when a new family is created 1132 families map[string]*columnFamily // keyed by plain family name 1133 rows *btree.BTree // indexed by row key 1134} 1135 1136const btreeDegree = 16 1137 1138func newTable(ctr *btapb.CreateTableRequest) *table { 1139 fams := make(map[string]*columnFamily) 1140 c := uint64(0) 1141 if ctr.Table != nil { 1142 for id, cf := range ctr.Table.ColumnFamilies { 1143 fams[id] = &columnFamily{ 1144 name: ctr.Parent + "/columnFamilies/" + id, 1145 order: c, 1146 gcRule: cf.GcRule, 1147 } 1148 c++ 1149 } 1150 } 1151 return &table{ 1152 families: fams, 1153 counter: c, 1154 rows: btree.New(btreeDegree), 1155 } 1156} 1157 1158func (t *table) validTimestamp(ts int64) bool { 1159 if ts < minValidMilliSeconds || ts > maxValidMilliSeconds { 1160 return false 1161 } 1162 1163 // Assume millisecond granularity is required. 1164 return ts%1000 == 0 1165} 1166 1167func (t *table) columnFamilies() map[string]*columnFamily { 1168 cp := make(map[string]*columnFamily) 1169 t.mu.RLock() 1170 for fam, cf := range t.families { 1171 cp[fam] = cf 1172 } 1173 t.mu.RUnlock() 1174 return cp 1175} 1176 1177func (t *table) mutableRow(key string) *row { 1178 bkey := btreeKey(key) 1179 // Try fast path first. 1180 t.mu.RLock() 1181 i := t.rows.Get(bkey) 1182 t.mu.RUnlock() 1183 if i != nil { 1184 return i.(*row) 1185 } 1186 1187 // We probably need to create the row. 1188 t.mu.Lock() 1189 defer t.mu.Unlock() 1190 i = t.rows.Get(bkey) 1191 if i != nil { 1192 return i.(*row) 1193 } 1194 r := newRow(key) 1195 t.rows.ReplaceOrInsert(r) 1196 return r 1197} 1198 1199func (t *table) gc() { 1200 // This method doesn't add or remove rows, so we only need a read lock for the table. 1201 t.mu.RLock() 1202 defer t.mu.RUnlock() 1203 1204 // Gather GC rules we'll apply. 1205 rules := make(map[string]*btapb.GcRule) // keyed by "fam" 1206 for fam, cf := range t.families { 1207 if cf.gcRule != nil { 1208 rules[fam] = cf.gcRule 1209 } 1210 } 1211 if len(rules) == 0 { 1212 return 1213 } 1214 1215 t.rows.Ascend(func(i btree.Item) bool { 1216 r := i.(*row) 1217 r.mu.Lock() 1218 r.gc(rules) 1219 r.mu.Unlock() 1220 return true 1221 }) 1222} 1223 1224type byRowKey []*row 1225 1226func (b byRowKey) Len() int { return len(b) } 1227func (b byRowKey) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 1228func (b byRowKey) Less(i, j int) bool { return b[i].key < b[j].key } 1229 1230type row struct { 1231 key string 1232 1233 mu sync.Mutex 1234 families map[string]*family // keyed by family name 1235} 1236 1237func newRow(key string) *row { 1238 return &row{ 1239 key: key, 1240 families: make(map[string]*family), 1241 } 1242} 1243 1244// copy returns a copy of the row. 1245// Cell values are aliased. 1246// r.mu should be held. 1247func (r *row) copy() *row { 1248 nr := newRow(r.key) 1249 for _, fam := range r.families { 1250 nr.families[fam.name] = &family{ 1251 name: fam.name, 1252 order: fam.order, 1253 colNames: fam.colNames, 1254 cells: make(map[string][]cell), 1255 } 1256 for col, cs := range fam.cells { 1257 // Copy the []cell slice, but not the []byte inside each cell. 1258 nr.families[fam.name].cells[col] = append([]cell(nil), cs...) 1259 } 1260 } 1261 return nr 1262} 1263 1264// isEmpty returns true if a row doesn't contain any cell 1265func (r *row) isEmpty() bool { 1266 for _, fam := range r.families { 1267 for _, cs := range fam.cells { 1268 if len(cs) > 0 { 1269 return false 1270 } 1271 } 1272 } 1273 return true 1274} 1275 1276// sortedFamilies returns a column family set 1277// sorted in ascending creation order in a row. 1278func (r *row) sortedFamilies() []*family { 1279 var families []*family 1280 for _, fam := range r.families { 1281 families = append(families, fam) 1282 } 1283 sort.Sort(byCreationOrder(families)) 1284 return families 1285} 1286 1287func (r *row) getOrCreateFamily(name string, order uint64) *family { 1288 if _, ok := r.families[name]; !ok { 1289 r.families[name] = &family{ 1290 name: name, 1291 order: order, 1292 cells: make(map[string][]cell), 1293 } 1294 } 1295 return r.families[name] 1296} 1297 1298// gc applies the given GC rules to the row. 1299// r.mu should be held. 1300func (r *row) gc(rules map[string]*btapb.GcRule) { 1301 for _, fam := range r.families { 1302 rule, ok := rules[fam.name] 1303 if !ok { 1304 continue 1305 } 1306 for col, cs := range fam.cells { 1307 r.families[fam.name].cells[col] = applyGC(cs, rule) 1308 } 1309 } 1310} 1311 1312// size returns the total size of all cell values in the row. 1313func (r *row) size() int { 1314 size := 0 1315 for _, fam := range r.families { 1316 for _, cells := range fam.cells { 1317 for _, cell := range cells { 1318 size += len(cell.value) 1319 } 1320 } 1321 } 1322 return size 1323} 1324 1325// Less implements btree.Less. 1326func (r *row) Less(i btree.Item) bool { 1327 return r.key < i.(*row).key 1328} 1329 1330// btreeKey returns a row for use as a key into the BTree. 1331func btreeKey(s string) *row { return &row{key: s} } 1332 1333func (r *row) String() string { 1334 return r.key 1335} 1336 1337var gcTypeWarn sync.Once 1338 1339// applyGC applies the given GC rule to the cells. 1340func applyGC(cells []cell, rule *btapb.GcRule) []cell { 1341 switch rule := rule.Rule.(type) { 1342 default: 1343 // TODO(dsymonds): Support GcRule_Intersection_ 1344 gcTypeWarn.Do(func() { 1345 log.Printf("Unsupported GC rule type %T", rule) 1346 }) 1347 case *btapb.GcRule_Union_: 1348 for _, sub := range rule.Union.Rules { 1349 cells = applyGC(cells, sub) 1350 } 1351 return cells 1352 case *btapb.GcRule_MaxAge: 1353 // Timestamps are in microseconds. 1354 cutoff := time.Now().UnixNano() / 1e3 1355 cutoff -= rule.MaxAge.Seconds * 1e6 1356 cutoff -= int64(rule.MaxAge.Nanos) / 1e3 1357 // The slice of cells in in descending timestamp order. 1358 // This sort.Search will return the index of the first cell whose timestamp is chronologically before the cutoff. 1359 si := sort.Search(len(cells), func(i int) bool { return cells[i].ts < cutoff }) 1360 if si < len(cells) { 1361 log.Printf("bttest: GC MaxAge(%v) deleted %d cells.", rule.MaxAge, len(cells)-si) 1362 } 1363 return cells[:si] 1364 case *btapb.GcRule_MaxNumVersions: 1365 n := int(rule.MaxNumVersions) 1366 if len(cells) > n { 1367 cells = cells[:n] 1368 } 1369 return cells 1370 } 1371 return cells 1372} 1373 1374type family struct { 1375 name string // Column family name 1376 order uint64 // Creation order of column family 1377 colNames []string // Column names are sorted in lexicographical ascending order 1378 cells map[string][]cell // Keyed by column name; cells are in descending timestamp order 1379} 1380 1381type byCreationOrder []*family 1382 1383func (b byCreationOrder) Len() int { return len(b) } 1384func (b byCreationOrder) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 1385func (b byCreationOrder) Less(i, j int) bool { return b[i].order < b[j].order } 1386 1387// cellsByColumn adds the column name to colNames set if it does not exist 1388// and returns all cells within a column 1389func (f *family) cellsByColumn(name string) []cell { 1390 if _, ok := f.cells[name]; !ok { 1391 f.colNames = append(f.colNames, name) 1392 sort.Strings(f.colNames) 1393 } 1394 return f.cells[name] 1395} 1396 1397type cell struct { 1398 ts int64 1399 value []byte 1400 labels []string 1401} 1402 1403type byDescTS []cell 1404 1405func (b byDescTS) Len() int { return len(b) } 1406func (b byDescTS) Swap(i, j int) { b[i], b[j] = b[j], b[i] } 1407func (b byDescTS) Less(i, j int) bool { return b[i].ts > b[j].ts } 1408 1409type columnFamily struct { 1410 name string 1411 order uint64 // Creation order of column family 1412 gcRule *btapb.GcRule 1413} 1414 1415func (c *columnFamily) proto() *btapb.ColumnFamily { 1416 return &btapb.ColumnFamily{ 1417 GcRule: c.gcRule, 1418 } 1419} 1420 1421func toColumnFamilies(families map[string]*columnFamily) map[string]*btapb.ColumnFamily { 1422 fs := make(map[string]*btapb.ColumnFamily) 1423 for k, v := range families { 1424 fs[k] = v.proto() 1425 } 1426 return fs 1427} 1428