1/*
2Copyright 2015 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17/*
18Package bttest contains test helpers for working with the bigtable package.
19
20To use a Server, create it, and then connect to it with no security:
21(The project/instance values are ignored.)
22	srv, err := bttest.NewServer("localhost:0")
23	...
24	conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
25	...
26	client, err := bigtable.NewClient(ctx, proj, instance,
27	        option.WithGRPCConn(conn))
28	...
29*/
30package bttest // import "cloud.google.com/go/bigtable/bttest"
31
32import (
33	"bytes"
34	"context"
35	"encoding/binary"
36	"fmt"
37	"log"
38	"math/rand"
39	"net"
40	"regexp"
41	"sort"
42	"strings"
43	"sync"
44	"time"
45
46	emptypb "github.com/golang/protobuf/ptypes/empty"
47	"github.com/golang/protobuf/ptypes/wrappers"
48	"github.com/google/btree"
49	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
50	btpb "google.golang.org/genproto/googleapis/bigtable/v2"
51	"google.golang.org/genproto/googleapis/longrunning"
52	statpb "google.golang.org/genproto/googleapis/rpc/status"
53	"google.golang.org/grpc"
54	"google.golang.org/grpc/codes"
55	"google.golang.org/grpc/status"
56	"rsc.io/binaryregexp"
57)
58
59const (
60	// MilliSeconds field of the minimum valid Timestamp.
61	minValidMilliSeconds = 0
62
63	// MilliSeconds field of the max valid Timestamp.
64	maxValidMilliSeconds = int64(time.Millisecond) * 253402300800
65)
66
67var (
68	validLabelTransformer = regexp.MustCompile(`[a-z0-9\-]{1,15}`)
69)
70
71// Server is an in-memory Cloud Bigtable fake.
72// It is unauthenticated, and only a rough approximation.
73type Server struct {
74	Addr string
75
76	l   net.Listener
77	srv *grpc.Server
78	s   *server
79}
80
81// server is the real implementation of the fake.
82// It is a separate and unexported type so the API won't be cluttered with
83// methods that are only relevant to the fake's implementation.
84type server struct {
85	mu        sync.Mutex
86	tables    map[string]*table          // keyed by fully qualified name
87	instances map[string]*btapb.Instance // keyed by fully qualified name
88	gcc       chan int                   // set when gcloop starts, closed when server shuts down
89
90	// Any unimplemented methods will cause a panic.
91	btapb.BigtableTableAdminServer
92	btapb.BigtableInstanceAdminServer
93	btpb.BigtableServer
94}
95
96// NewServer creates a new Server.
97// The Server will be listening for gRPC connections, without TLS,
98// on the provided address. The resolved address is named by the Addr field.
99func NewServer(laddr string, opt ...grpc.ServerOption) (*Server, error) {
100	l, err := net.Listen("tcp", laddr)
101	if err != nil {
102		return nil, err
103	}
104
105	s := &Server{
106		Addr: l.Addr().String(),
107		l:    l,
108		srv:  grpc.NewServer(opt...),
109		s: &server{
110			tables:    make(map[string]*table),
111			instances: make(map[string]*btapb.Instance),
112		},
113	}
114	btapb.RegisterBigtableInstanceAdminServer(s.srv, s.s)
115	btapb.RegisterBigtableTableAdminServer(s.srv, s.s)
116	btpb.RegisterBigtableServer(s.srv, s.s)
117
118	go s.srv.Serve(s.l)
119
120	return s, nil
121}
122
123// Close shuts down the server.
124func (s *Server) Close() {
125	s.s.mu.Lock()
126	if s.s.gcc != nil {
127		close(s.s.gcc)
128	}
129	s.s.mu.Unlock()
130
131	s.srv.Stop()
132	s.l.Close()
133}
134
135func (s *server) CreateTable(ctx context.Context, req *btapb.CreateTableRequest) (*btapb.Table, error) {
136	tbl := req.Parent + "/tables/" + req.TableId
137
138	s.mu.Lock()
139	if _, ok := s.tables[tbl]; ok {
140		s.mu.Unlock()
141		return nil, status.Errorf(codes.AlreadyExists, "table %q already exists", tbl)
142	}
143	s.tables[tbl] = newTable(req)
144	s.mu.Unlock()
145
146	ct := &btapb.Table{
147		Name:           tbl,
148		ColumnFamilies: req.GetTable().GetColumnFamilies(),
149		Granularity:    req.GetTable().GetGranularity(),
150	}
151	if ct.Granularity == 0 {
152		ct.Granularity = btapb.Table_MILLIS
153	}
154	return ct, nil
155}
156
157func (s *server) CreateTableFromSnapshot(context.Context, *btapb.CreateTableFromSnapshotRequest) (*longrunning.Operation, error) {
158	return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots")
159}
160
161func (s *server) ListTables(ctx context.Context, req *btapb.ListTablesRequest) (*btapb.ListTablesResponse, error) {
162	res := &btapb.ListTablesResponse{}
163	prefix := req.Parent + "/tables/"
164
165	s.mu.Lock()
166	for tbl := range s.tables {
167		if strings.HasPrefix(tbl, prefix) {
168			res.Tables = append(res.Tables, &btapb.Table{Name: tbl})
169		}
170	}
171	s.mu.Unlock()
172
173	return res, nil
174}
175
176func (s *server) GetTable(ctx context.Context, req *btapb.GetTableRequest) (*btapb.Table, error) {
177	tbl := req.Name
178
179	s.mu.Lock()
180	tblIns, ok := s.tables[tbl]
181	s.mu.Unlock()
182	if !ok {
183		return nil, status.Errorf(codes.NotFound, "table %q not found", tbl)
184	}
185
186	return &btapb.Table{
187		Name:           tbl,
188		ColumnFamilies: toColumnFamilies(tblIns.columnFamilies()),
189	}, nil
190}
191
192func (s *server) DeleteTable(ctx context.Context, req *btapb.DeleteTableRequest) (*emptypb.Empty, error) {
193	s.mu.Lock()
194	defer s.mu.Unlock()
195	if _, ok := s.tables[req.Name]; !ok {
196		return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name)
197	}
198	delete(s.tables, req.Name)
199	return &emptypb.Empty{}, nil
200}
201
202func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColumnFamiliesRequest) (*btapb.Table, error) {
203	s.mu.Lock()
204	tbl, ok := s.tables[req.Name]
205	s.mu.Unlock()
206	if !ok {
207		return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name)
208	}
209
210	tbl.mu.Lock()
211	defer tbl.mu.Unlock()
212
213	for _, mod := range req.Modifications {
214		if create := mod.GetCreate(); create != nil {
215			if _, ok := tbl.families[mod.Id]; ok {
216				return nil, status.Errorf(codes.AlreadyExists, "family %q already exists", mod.Id)
217			}
218			newcf := &columnFamily{
219				name:   req.Name + "/columnFamilies/" + mod.Id,
220				order:  tbl.counter,
221				gcRule: create.GcRule,
222			}
223			tbl.counter++
224			tbl.families[mod.Id] = newcf
225		} else if mod.GetDrop() {
226			if _, ok := tbl.families[mod.Id]; !ok {
227				return nil, fmt.Errorf("can't delete unknown family %q", mod.Id)
228			}
229			delete(tbl.families, mod.Id)
230		} else if modify := mod.GetUpdate(); modify != nil {
231			if _, ok := tbl.families[mod.Id]; !ok {
232				return nil, fmt.Errorf("no such family %q", mod.Id)
233			}
234			newcf := &columnFamily{
235				name:   req.Name + "/columnFamilies/" + mod.Id,
236				gcRule: modify.GcRule,
237			}
238			// assume that we ALWAYS want to replace by the new setting
239			// we may need partial update through
240			tbl.families[mod.Id] = newcf
241		}
242	}
243
244	s.needGC()
245	return &btapb.Table{
246		Name:           req.Name,
247		ColumnFamilies: toColumnFamilies(tbl.families),
248		Granularity:    btapb.Table_TimestampGranularity(btapb.Table_MILLIS),
249	}, nil
250}
251
252func (s *server) DropRowRange(ctx context.Context, req *btapb.DropRowRangeRequest) (*emptypb.Empty, error) {
253	s.mu.Lock()
254	defer s.mu.Unlock()
255	tbl, ok := s.tables[req.Name]
256	if !ok {
257		return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name)
258	}
259
260	if req.GetDeleteAllDataFromTable() {
261		tbl.rows = btree.New(btreeDegree)
262	} else {
263		// Delete rows by prefix.
264		prefixBytes := req.GetRowKeyPrefix()
265		if prefixBytes == nil {
266			return nil, fmt.Errorf("missing row key prefix")
267		}
268		prefix := string(prefixBytes)
269
270		// The BTree does not specify what happens if rows are deleted during
271		// iteration, and it provides no "delete range" method.
272		// So we collect the rows first, then delete them one by one.
273		var rowsToDelete []*row
274		tbl.rows.AscendGreaterOrEqual(btreeKey(prefix), func(i btree.Item) bool {
275			r := i.(*row)
276			if strings.HasPrefix(r.key, prefix) {
277				rowsToDelete = append(rowsToDelete, r)
278				return true
279			}
280			return false // stop iteration
281		})
282		for _, r := range rowsToDelete {
283			tbl.rows.Delete(r)
284		}
285	}
286	return &emptypb.Empty{}, nil
287}
288
289func (s *server) GenerateConsistencyToken(ctx context.Context, req *btapb.GenerateConsistencyTokenRequest) (*btapb.GenerateConsistencyTokenResponse, error) {
290	// Check that the table exists.
291	_, ok := s.tables[req.Name]
292	if !ok {
293		return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name)
294	}
295
296	return &btapb.GenerateConsistencyTokenResponse{
297		ConsistencyToken: "TokenFor-" + req.Name,
298	}, nil
299}
300
301func (s *server) CheckConsistency(ctx context.Context, req *btapb.CheckConsistencyRequest) (*btapb.CheckConsistencyResponse, error) {
302	// Check that the table exists.
303	_, ok := s.tables[req.Name]
304	if !ok {
305		return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name)
306	}
307
308	// Check this is the right token.
309	if req.ConsistencyToken != "TokenFor-"+req.Name {
310		return nil, status.Errorf(codes.InvalidArgument, "token %q not valid", req.ConsistencyToken)
311	}
312
313	// Single cluster instances are always consistent.
314	return &btapb.CheckConsistencyResponse{
315		Consistent: true,
316	}, nil
317}
318
319func (s *server) SnapshotTable(context.Context, *btapb.SnapshotTableRequest) (*longrunning.Operation, error) {
320	return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots")
321}
322
323func (s *server) GetSnapshot(context.Context, *btapb.GetSnapshotRequest) (*btapb.Snapshot, error) {
324	return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots")
325}
326func (s *server) ListSnapshots(context.Context, *btapb.ListSnapshotsRequest) (*btapb.ListSnapshotsResponse, error) {
327	return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots")
328}
329func (s *server) DeleteSnapshot(context.Context, *btapb.DeleteSnapshotRequest) (*emptypb.Empty, error) {
330	return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots")
331}
332
333func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRowsServer) error {
334	s.mu.Lock()
335	tbl, ok := s.tables[req.TableName]
336	s.mu.Unlock()
337	if !ok {
338		return status.Errorf(codes.NotFound, "table %q not found", req.TableName)
339	}
340
341	if err := validateRowRanges(req); err != nil {
342		return err
343	}
344
345	// Rows to read can be specified by a set of row keys and/or a set of row ranges.
346	// Output is a stream of sorted, de-duped rows.
347	tbl.mu.RLock()
348	rowSet := make(map[string]*row)
349
350	addRow := func(i btree.Item) bool {
351		r := i.(*row)
352		rowSet[r.key] = r
353		return true
354	}
355
356	if req.Rows != nil &&
357		len(req.Rows.RowKeys)+len(req.Rows.RowRanges) > 0 {
358		// Add the explicitly given keys
359		for _, key := range req.Rows.RowKeys {
360			k := string(key)
361			if i := tbl.rows.Get(btreeKey(k)); i != nil {
362				addRow(i)
363			}
364		}
365
366		// Add keys from row ranges
367		for _, rr := range req.Rows.RowRanges {
368			var start, end string
369			switch sk := rr.StartKey.(type) {
370			case *btpb.RowRange_StartKeyClosed:
371				start = string(sk.StartKeyClosed)
372			case *btpb.RowRange_StartKeyOpen:
373				start = string(sk.StartKeyOpen) + "\x00"
374			}
375			switch ek := rr.EndKey.(type) {
376			case *btpb.RowRange_EndKeyClosed:
377				end = string(ek.EndKeyClosed) + "\x00"
378			case *btpb.RowRange_EndKeyOpen:
379				end = string(ek.EndKeyOpen)
380			}
381			switch {
382			case start == "" && end == "":
383				tbl.rows.Ascend(addRow) // all rows
384			case start == "":
385				tbl.rows.AscendLessThan(btreeKey(end), addRow)
386			case end == "":
387				tbl.rows.AscendGreaterOrEqual(btreeKey(start), addRow)
388			default:
389				tbl.rows.AscendRange(btreeKey(start), btreeKey(end), addRow)
390			}
391		}
392	} else {
393		// Read all rows
394		tbl.rows.Ascend(addRow)
395	}
396	tbl.mu.RUnlock()
397
398	rows := make([]*row, 0, len(rowSet))
399	for _, r := range rowSet {
400		r.mu.Lock()
401		fams := len(r.families)
402		r.mu.Unlock()
403
404		if fams != 0 {
405			rows = append(rows, r)
406		}
407	}
408	sort.Sort(byRowKey(rows))
409
410	limit := int(req.RowsLimit)
411	count := 0
412	for _, r := range rows {
413		if limit > 0 && count >= limit {
414			return nil
415		}
416		streamed, err := streamRow(stream, r, req.Filter)
417		if err != nil {
418			return err
419		}
420		if streamed {
421			count++
422		}
423	}
424	return nil
425}
426
427// streamRow filters the given row and sends it via the given stream.
428// Returns true if at least one cell matched the filter and was streamed, false otherwise.
429func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter) (bool, error) {
430	r.mu.Lock()
431	nr := r.copy()
432	r.mu.Unlock()
433	r = nr
434
435	match, err := filterRow(f, r)
436	if err != nil {
437		return false, err
438	}
439	if !match {
440		return false, nil
441	}
442
443	rrr := &btpb.ReadRowsResponse{}
444	families := r.sortedFamilies()
445	for _, fam := range families {
446		for _, colName := range fam.colNames {
447			cells := fam.cells[colName]
448			if len(cells) == 0 {
449				continue
450			}
451			for _, cell := range cells {
452				rrr.Chunks = append(rrr.Chunks, &btpb.ReadRowsResponse_CellChunk{
453					RowKey:          []byte(r.key),
454					FamilyName:      &wrappers.StringValue{Value: fam.name},
455					Qualifier:       &wrappers.BytesValue{Value: []byte(colName)},
456					TimestampMicros: cell.ts,
457					Value:           cell.value,
458					Labels:          cell.labels,
459				})
460			}
461		}
462	}
463	// We can't have a cell with just COMMIT set, which would imply a new empty cell.
464	// So modify the last cell to have the COMMIT flag set.
465	if len(rrr.Chunks) > 0 {
466		rrr.Chunks[len(rrr.Chunks)-1].RowStatus = &btpb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: true}
467	}
468
469	return true, stream.Send(rrr)
470}
471
472// filterRow modifies a row with the given filter. Returns true if at least one cell from the row matches,
473// false otherwise. If a filter is invalid, filterRow returns false and an error.
474func filterRow(f *btpb.RowFilter, r *row) (bool, error) {
475	if f == nil {
476		return true, nil
477	}
478	// Handle filters that apply beyond just including/excluding cells.
479	switch f := f.Filter.(type) {
480	case *btpb.RowFilter_BlockAllFilter:
481		return !f.BlockAllFilter, nil
482	case *btpb.RowFilter_PassAllFilter:
483		return f.PassAllFilter, nil
484	case *btpb.RowFilter_Chain_:
485		for _, sub := range f.Chain.Filters {
486			match, err := filterRow(sub, r)
487			if err != nil {
488				return false, err
489			}
490			if !match {
491				return false, nil
492			}
493		}
494		return true, nil
495	case *btpb.RowFilter_Interleave_:
496		srs := make([]*row, 0, len(f.Interleave.Filters))
497		for _, sub := range f.Interleave.Filters {
498			sr := r.copy()
499			match, err := filterRow(sub, sr)
500			if err != nil {
501				return false, err
502			}
503			if match {
504				srs = append(srs, sr)
505			}
506		}
507		// merge
508		// TODO(dsymonds): is this correct?
509		r.families = make(map[string]*family)
510		for _, sr := range srs {
511			for _, fam := range sr.families {
512				f := r.getOrCreateFamily(fam.name, fam.order)
513				for colName, cs := range fam.cells {
514					f.cells[colName] = append(f.cellsByColumn(colName), cs...)
515				}
516			}
517		}
518		var count int
519		for _, fam := range r.families {
520			for _, cs := range fam.cells {
521				sort.Sort(byDescTS(cs))
522				count += len(cs)
523			}
524		}
525		return count > 0, nil
526	case *btpb.RowFilter_CellsPerColumnLimitFilter:
527		lim := int(f.CellsPerColumnLimitFilter)
528		for _, fam := range r.families {
529			for col, cs := range fam.cells {
530				if len(cs) > lim {
531					fam.cells[col] = cs[:lim]
532				}
533			}
534		}
535		return true, nil
536	case *btpb.RowFilter_Condition_:
537		match, err := filterRow(f.Condition.PredicateFilter, r.copy())
538		if err != nil {
539			return false, err
540		}
541		if match {
542			if f.Condition.TrueFilter == nil {
543				return false, nil
544			}
545			return filterRow(f.Condition.TrueFilter, r)
546		}
547		if f.Condition.FalseFilter == nil {
548			return false, nil
549		}
550		return filterRow(f.Condition.FalseFilter, r)
551	case *btpb.RowFilter_RowKeyRegexFilter:
552		rx, err := newRegexp(f.RowKeyRegexFilter)
553		if err != nil {
554			return false, status.Errorf(codes.InvalidArgument, "Error in field 'rowkey_regex_filter' : %v", err)
555		}
556		if !rx.MatchString(r.key) {
557			return false, nil
558		}
559	case *btpb.RowFilter_CellsPerRowLimitFilter:
560		// Grab the first n cells in the row.
561		lim := int(f.CellsPerRowLimitFilter)
562		for _, fam := range r.families {
563			for _, col := range fam.colNames {
564				cs := fam.cells[col]
565				if len(cs) > lim {
566					fam.cells[col] = cs[:lim]
567					lim = 0
568				} else {
569					lim -= len(cs)
570				}
571			}
572		}
573		return true, nil
574	case *btpb.RowFilter_CellsPerRowOffsetFilter:
575		// Skip the first n cells in the row.
576		offset := int(f.CellsPerRowOffsetFilter)
577		for _, fam := range r.families {
578			for _, col := range fam.colNames {
579				cs := fam.cells[col]
580				if len(cs) > offset {
581					fam.cells[col] = cs[offset:]
582					offset = 0
583					return true, nil
584				}
585				fam.cells[col] = cs[:0]
586				offset -= len(cs)
587			}
588		}
589		return true, nil
590	case *btpb.RowFilter_RowSampleFilter:
591		// The row sample filter "matches all cells from a row with probability
592		// p, and matches no cells from the row with probability 1-p."
593		// See https://github.com/googleapis/googleapis/blob/master/google/bigtable/v2/data.proto
594		if f.RowSampleFilter <= 0.0 || f.RowSampleFilter >= 1.0 {
595			return false, status.Error(codes.InvalidArgument, "row_sample_filter argument must be between 0.0 and 1.0")
596		}
597		return randFloat() < f.RowSampleFilter, nil
598	}
599
600	// Any other case, operate on a per-cell basis.
601	cellCount := 0
602	for _, fam := range r.families {
603		for colName, cs := range fam.cells {
604			filtered, err := filterCells(f, fam.name, colName, cs)
605			if err != nil {
606				return false, err
607			}
608			fam.cells[colName] = filtered
609			cellCount += len(fam.cells[colName])
610		}
611	}
612	return cellCount > 0, nil
613}
614
615var randFloat = rand.Float64
616
617func filterCells(f *btpb.RowFilter, fam, col string, cs []cell) ([]cell, error) {
618	var ret []cell
619	for _, cell := range cs {
620		include, err := includeCell(f, fam, col, cell)
621		if err != nil {
622			return nil, err
623		}
624		if include {
625			cell, err = modifyCell(f, cell)
626			if err != nil {
627				return nil, err
628			}
629			ret = append(ret, cell)
630		}
631	}
632	return ret, nil
633}
634
635func modifyCell(f *btpb.RowFilter, c cell) (cell, error) {
636	if f == nil {
637		return c, nil
638	}
639	// Consider filters that may modify the cell contents
640	switch filter := f.Filter.(type) {
641	case *btpb.RowFilter_StripValueTransformer:
642		return cell{ts: c.ts}, nil
643	case *btpb.RowFilter_ApplyLabelTransformer:
644		if !validLabelTransformer.MatchString(filter.ApplyLabelTransformer) {
645			return cell{}, status.Errorf(
646				codes.InvalidArgument,
647				`apply_label_transformer must match RE2([a-z0-9\-]+), but found %v`,
648				filter.ApplyLabelTransformer,
649			)
650		}
651		return cell{ts: c.ts, value: c.value, labels: []string{filter.ApplyLabelTransformer}}, nil
652	default:
653		return c, nil
654	}
655}
656
657func includeCell(f *btpb.RowFilter, fam, col string, cell cell) (bool, error) {
658	if f == nil {
659		return true, nil
660	}
661	// TODO(dsymonds): Implement many more filters.
662	switch f := f.Filter.(type) {
663	case *btpb.RowFilter_CellsPerColumnLimitFilter:
664		// Don't log, row-level filter
665		return true, nil
666	case *btpb.RowFilter_RowKeyRegexFilter:
667		// Don't log, row-level filter
668		return true, nil
669	case *btpb.RowFilter_StripValueTransformer:
670		// Don't log, cell-modifying filter
671		return true, nil
672	case *btpb.RowFilter_ApplyLabelTransformer:
673		// Don't log, cell-modifying filter
674		return true, nil
675	default:
676		log.Printf("WARNING: don't know how to handle filter of type %T (ignoring it)", f)
677		return true, nil
678	case *btpb.RowFilter_FamilyNameRegexFilter:
679		rx, err := newRegexp([]byte(f.FamilyNameRegexFilter))
680		if err != nil {
681			return false, status.Errorf(codes.InvalidArgument, "Error in field 'family_name_regex_filter' : %v", err)
682		}
683		return rx.MatchString(fam), nil
684	case *btpb.RowFilter_ColumnQualifierRegexFilter:
685		rx, err := newRegexp(f.ColumnQualifierRegexFilter)
686		if err != nil {
687			return false, status.Errorf(codes.InvalidArgument, "Error in field 'column_qualifier_regex_filter' : %v", err)
688		}
689		return rx.MatchString(col), nil
690	case *btpb.RowFilter_ValueRegexFilter:
691		rx, err := newRegexp(f.ValueRegexFilter)
692		if err != nil {
693			return false, status.Errorf(codes.InvalidArgument, "Error in field 'value_regex_filter' : %v", err)
694		}
695		return rx.Match(cell.value), nil
696	case *btpb.RowFilter_ColumnRangeFilter:
697		if fam != f.ColumnRangeFilter.FamilyName {
698			return false, nil
699		}
700		// Start qualifier defaults to empty string closed
701		inRangeStart := func() bool { return col >= "" }
702		switch sq := f.ColumnRangeFilter.StartQualifier.(type) {
703		case *btpb.ColumnRange_StartQualifierOpen:
704			inRangeStart = func() bool { return col > string(sq.StartQualifierOpen) }
705		case *btpb.ColumnRange_StartQualifierClosed:
706			inRangeStart = func() bool { return col >= string(sq.StartQualifierClosed) }
707		}
708		// End qualifier defaults to no upper boundary
709		inRangeEnd := func() bool { return true }
710		switch eq := f.ColumnRangeFilter.EndQualifier.(type) {
711		case *btpb.ColumnRange_EndQualifierClosed:
712			inRangeEnd = func() bool { return col <= string(eq.EndQualifierClosed) }
713		case *btpb.ColumnRange_EndQualifierOpen:
714			inRangeEnd = func() bool { return col < string(eq.EndQualifierOpen) }
715		}
716		return inRangeStart() && inRangeEnd(), nil
717	case *btpb.RowFilter_TimestampRangeFilter:
718		// Server should only support millisecond precision.
719		if f.TimestampRangeFilter.StartTimestampMicros%int64(time.Millisecond/time.Microsecond) != 0 || f.TimestampRangeFilter.EndTimestampMicros%int64(time.Millisecond/time.Microsecond) != 0 {
720			return false, status.Errorf(codes.InvalidArgument, "Error in field 'timestamp_range_filter'. Maximum precision allowed in filter is millisecond.\nGot:\nStart: %v\nEnd: %v", f.TimestampRangeFilter.StartTimestampMicros, f.TimestampRangeFilter.EndTimestampMicros)
721		}
722		// Lower bound is inclusive and defaults to 0, upper bound is exclusive and defaults to infinity.
723		return cell.ts >= f.TimestampRangeFilter.StartTimestampMicros &&
724			(f.TimestampRangeFilter.EndTimestampMicros == 0 || cell.ts < f.TimestampRangeFilter.EndTimestampMicros), nil
725	case *btpb.RowFilter_ValueRangeFilter:
726		v := cell.value
727		// Start value defaults to empty string closed
728		inRangeStart := func() bool { return bytes.Compare(v, []byte{}) >= 0 }
729		switch sv := f.ValueRangeFilter.StartValue.(type) {
730		case *btpb.ValueRange_StartValueOpen:
731			inRangeStart = func() bool { return bytes.Compare(v, sv.StartValueOpen) > 0 }
732		case *btpb.ValueRange_StartValueClosed:
733			inRangeStart = func() bool { return bytes.Compare(v, sv.StartValueClosed) >= 0 }
734		}
735		// End value defaults to no upper boundary
736		inRangeEnd := func() bool { return true }
737		switch ev := f.ValueRangeFilter.EndValue.(type) {
738		case *btpb.ValueRange_EndValueClosed:
739			inRangeEnd = func() bool { return bytes.Compare(v, ev.EndValueClosed) <= 0 }
740		case *btpb.ValueRange_EndValueOpen:
741			inRangeEnd = func() bool { return bytes.Compare(v, ev.EndValueOpen) < 0 }
742		}
743		return inRangeStart() && inRangeEnd(), nil
744	}
745}
746
747func newRegexp(pat []byte) (*binaryregexp.Regexp, error) {
748	re, err := binaryregexp.Compile("^(?:" + string(pat) + ")$") // match entire target
749	if err != nil {
750		log.Printf("Bad pattern %q: %v", pat, err)
751	}
752	return re, err
753}
754
755func (s *server) MutateRow(ctx context.Context, req *btpb.MutateRowRequest) (*btpb.MutateRowResponse, error) {
756	s.mu.Lock()
757	tbl, ok := s.tables[req.TableName]
758	s.mu.Unlock()
759	if !ok {
760		return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName)
761	}
762	fs := tbl.columnFamilies()
763	r := tbl.mutableRow(string(req.RowKey))
764	r.mu.Lock()
765	defer r.mu.Unlock()
766	if err := applyMutations(tbl, r, req.Mutations, fs); err != nil {
767		return nil, err
768	}
769	return &btpb.MutateRowResponse{}, nil
770}
771
772func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_MutateRowsServer) error {
773	s.mu.Lock()
774	tbl, ok := s.tables[req.TableName]
775	s.mu.Unlock()
776	if !ok {
777		return status.Errorf(codes.NotFound, "table %q not found", req.TableName)
778	}
779	res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))}
780
781	fs := tbl.columnFamilies()
782
783	for i, entry := range req.Entries {
784		r := tbl.mutableRow(string(entry.RowKey))
785		r.mu.Lock()
786		code, msg := int32(codes.OK), ""
787		if err := applyMutations(tbl, r, entry.Mutations, fs); err != nil {
788			code = int32(codes.Internal)
789			msg = err.Error()
790		}
791		res.Entries[i] = &btpb.MutateRowsResponse_Entry{
792			Index:  int64(i),
793			Status: &statpb.Status{Code: code, Message: msg},
794		}
795		r.mu.Unlock()
796	}
797	return stream.Send(res)
798}
799
800func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutateRowRequest) (*btpb.CheckAndMutateRowResponse, error) {
801	s.mu.Lock()
802	tbl, ok := s.tables[req.TableName]
803	s.mu.Unlock()
804	if !ok {
805		return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName)
806	}
807	res := &btpb.CheckAndMutateRowResponse{}
808
809	fs := tbl.columnFamilies()
810
811	r := tbl.mutableRow(string(req.RowKey))
812	r.mu.Lock()
813	defer r.mu.Unlock()
814
815	// Figure out which mutation to apply.
816	whichMut := false
817	if req.PredicateFilter == nil {
818		// Use true_mutations iff row contains any cells.
819		whichMut = !r.isEmpty()
820	} else {
821		// Use true_mutations iff any cells in the row match the filter.
822		// TODO(dsymonds): This could be cheaper.
823		nr := r.copy()
824
825		match, err := filterRow(req.PredicateFilter, nr)
826		if err != nil {
827			return nil, err
828		}
829		whichMut = match && !nr.isEmpty()
830	}
831	res.PredicateMatched = whichMut
832	muts := req.FalseMutations
833	if whichMut {
834		muts = req.TrueMutations
835	}
836
837	if err := applyMutations(tbl, r, muts, fs); err != nil {
838		return nil, err
839	}
840	return res, nil
841}
842
843// applyMutations applies a sequence of mutations to a row.
844// fam should be a snapshot of the keys of tbl.families.
845// It assumes r.mu is locked.
846func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*columnFamily) error {
847	for _, mut := range muts {
848		switch mut := mut.Mutation.(type) {
849		default:
850			return fmt.Errorf("can't handle mutation type %T", mut)
851		case *btpb.Mutation_SetCell_:
852			set := mut.SetCell
853			if _, ok := fs[set.FamilyName]; !ok {
854				return fmt.Errorf("unknown family %q", set.FamilyName)
855			}
856			ts := set.TimestampMicros
857			if ts == -1 { // bigtable.ServerTime
858				ts = newTimestamp()
859			}
860			if !tbl.validTimestamp(ts) {
861				return fmt.Errorf("invalid timestamp %d", ts)
862			}
863			fam := set.FamilyName
864			col := string(set.ColumnQualifier)
865
866			newCell := cell{ts: ts, value: set.Value}
867			f := r.getOrCreateFamily(fam, fs[fam].order)
868			f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell)
869		case *btpb.Mutation_DeleteFromColumn_:
870			del := mut.DeleteFromColumn
871			if _, ok := fs[del.FamilyName]; !ok {
872				return fmt.Errorf("unknown family %q", del.FamilyName)
873			}
874			fam := del.FamilyName
875			col := string(del.ColumnQualifier)
876			if _, ok := r.families[fam]; ok {
877				cs := r.families[fam].cells[col]
878				if del.TimeRange != nil {
879					tsr := del.TimeRange
880					if !tbl.validTimestamp(tsr.StartTimestampMicros) {
881						return fmt.Errorf("invalid timestamp %d", tsr.StartTimestampMicros)
882					}
883					if !tbl.validTimestamp(tsr.EndTimestampMicros) && tsr.EndTimestampMicros != 0 {
884						return fmt.Errorf("invalid timestamp %d", tsr.EndTimestampMicros)
885					}
886					if tsr.StartTimestampMicros >= tsr.EndTimestampMicros && tsr.EndTimestampMicros != 0 {
887						return fmt.Errorf("inverted or invalid timestamp range [%d, %d]", tsr.StartTimestampMicros, tsr.EndTimestampMicros)
888					}
889
890					// Find half-open interval to remove.
891					// Cells are in descending timestamp order,
892					// so the predicates to sort.Search are inverted.
893					si, ei := 0, len(cs)
894					if tsr.StartTimestampMicros > 0 {
895						ei = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.StartTimestampMicros })
896					}
897					if tsr.EndTimestampMicros > 0 {
898						si = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.EndTimestampMicros })
899					}
900					if si < ei {
901						copy(cs[si:], cs[ei:])
902						cs = cs[:len(cs)-(ei-si)]
903					}
904				} else {
905					cs = nil
906				}
907				if len(cs) == 0 {
908					delete(r.families[fam].cells, col)
909					colNames := r.families[fam].colNames
910					i := sort.Search(len(colNames), func(i int) bool { return colNames[i] >= col })
911					if i < len(colNames) && colNames[i] == col {
912						r.families[fam].colNames = append(colNames[:i], colNames[i+1:]...)
913					}
914					if len(r.families[fam].cells) == 0 {
915						delete(r.families, fam)
916					}
917				} else {
918					r.families[fam].cells[col] = cs
919				}
920			}
921		case *btpb.Mutation_DeleteFromRow_:
922			r.families = make(map[string]*family)
923		case *btpb.Mutation_DeleteFromFamily_:
924			fampre := mut.DeleteFromFamily.FamilyName
925			delete(r.families, fampre)
926		}
927	}
928	return nil
929}
930
931func maxTimestamp(x, y int64) int64 {
932	if x > y {
933		return x
934	}
935	return y
936}
937
938func newTimestamp() int64 {
939	ts := time.Now().UnixNano() / 1e3
940	ts -= ts % 1000 // round to millisecond granularity
941	return ts
942}
943
944func appendOrReplaceCell(cs []cell, newCell cell) []cell {
945	replaced := false
946	for i, cell := range cs {
947		if cell.ts == newCell.ts {
948			cs[i] = newCell
949			replaced = true
950			break
951		}
952	}
953	if !replaced {
954		cs = append(cs, newCell)
955	}
956	sort.Sort(byDescTS(cs))
957	return cs
958}
959
960func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWriteRowRequest) (*btpb.ReadModifyWriteRowResponse, error) {
961	s.mu.Lock()
962	tbl, ok := s.tables[req.TableName]
963	s.mu.Unlock()
964	if !ok {
965		return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName)
966	}
967
968	fs := tbl.columnFamilies()
969
970	rowKey := string(req.RowKey)
971	r := tbl.mutableRow(rowKey)
972	resultRow := newRow(rowKey) // copy of updated cells
973
974	// This must be done before the row lock, acquired below, is released.
975	r.mu.Lock()
976	defer r.mu.Unlock()
977	// Assume all mutations apply to the most recent version of the cell.
978	// TODO(dsymonds): Verify this assumption and document it in the proto.
979	for _, rule := range req.Rules {
980		if _, ok := fs[rule.FamilyName]; !ok {
981			return nil, fmt.Errorf("unknown family %q", rule.FamilyName)
982		}
983
984		fam := rule.FamilyName
985		col := string(rule.ColumnQualifier)
986		isEmpty := false
987		f := r.getOrCreateFamily(fam, fs[fam].order)
988		cs := f.cells[col]
989		isEmpty = len(cs) == 0
990
991		ts := newTimestamp()
992		var newCell, prevCell cell
993		if !isEmpty {
994			cells := r.families[fam].cells[col]
995			prevCell = cells[0]
996
997			// ts is the max of now or the prev cell's timestamp in case the
998			// prev cell is in the future
999			ts = maxTimestamp(ts, prevCell.ts)
1000		}
1001
1002		switch rule := rule.Rule.(type) {
1003		default:
1004			return nil, fmt.Errorf("unknown RMW rule oneof %T", rule)
1005		case *btpb.ReadModifyWriteRule_AppendValue:
1006			newCell = cell{ts: ts, value: append(prevCell.value, rule.AppendValue...)}
1007		case *btpb.ReadModifyWriteRule_IncrementAmount:
1008			var v int64
1009			if !isEmpty {
1010				prevVal := prevCell.value
1011				if len(prevVal) != 8 {
1012					return nil, fmt.Errorf("increment on non-64-bit value")
1013				}
1014				v = int64(binary.BigEndian.Uint64(prevVal))
1015			}
1016			v += rule.IncrementAmount
1017			var val [8]byte
1018			binary.BigEndian.PutUint64(val[:], uint64(v))
1019			newCell = cell{ts: ts, value: val[:]}
1020		}
1021
1022		// Store the new cell
1023		f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell)
1024
1025		// Store a copy for the result row
1026		resultFamily := resultRow.getOrCreateFamily(fam, fs[fam].order)
1027		resultFamily.cellsByColumn(col)           // create the column
1028		resultFamily.cells[col] = []cell{newCell} // overwrite the cells
1029	}
1030
1031	// Build the response using the result row
1032	res := &btpb.Row{
1033		Key:      req.RowKey,
1034		Families: make([]*btpb.Family, len(resultRow.families)),
1035	}
1036
1037	for i, family := range resultRow.sortedFamilies() {
1038		res.Families[i] = &btpb.Family{
1039			Name:    family.name,
1040			Columns: make([]*btpb.Column, len(family.colNames)),
1041		}
1042
1043		for j, colName := range family.colNames {
1044			res.Families[i].Columns[j] = &btpb.Column{
1045				Qualifier: []byte(colName),
1046				Cells: []*btpb.Cell{{
1047					TimestampMicros: family.cells[colName][0].ts,
1048					Value:           family.cells[colName][0].value,
1049				}},
1050			}
1051		}
1052	}
1053	return &btpb.ReadModifyWriteRowResponse{Row: res}, nil
1054}
1055
1056func (s *server) SampleRowKeys(req *btpb.SampleRowKeysRequest, stream btpb.Bigtable_SampleRowKeysServer) error {
1057	s.mu.Lock()
1058	tbl, ok := s.tables[req.TableName]
1059	s.mu.Unlock()
1060	if !ok {
1061		return status.Errorf(codes.NotFound, "table %q not found", req.TableName)
1062	}
1063
1064	tbl.mu.RLock()
1065	defer tbl.mu.RUnlock()
1066
1067	// The return value of SampleRowKeys is very loosely defined. Return at least the
1068	// final row key in the table and choose other row keys randomly.
1069	var offset int64
1070	var err error
1071	i := 0
1072	tbl.rows.Ascend(func(it btree.Item) bool {
1073		row := it.(*row)
1074		if i == tbl.rows.Len()-1 || rand.Int31n(100) == 0 {
1075			resp := &btpb.SampleRowKeysResponse{
1076				RowKey:      []byte(row.key),
1077				OffsetBytes: offset,
1078			}
1079			err = stream.Send(resp)
1080			if err != nil {
1081				return false
1082			}
1083		}
1084		offset += int64(row.size())
1085		i++
1086		return true
1087	})
1088	return err
1089}
1090
1091// needGC is invoked whenever the server needs gcloop running.
1092func (s *server) needGC() {
1093	s.mu.Lock()
1094	if s.gcc == nil {
1095		s.gcc = make(chan int)
1096		go s.gcloop(s.gcc)
1097	}
1098	s.mu.Unlock()
1099}
1100
1101func (s *server) gcloop(done <-chan int) {
1102	const (
1103		minWait = 500  // ms
1104		maxWait = 1500 // ms
1105	)
1106
1107	for {
1108		// Wait for a random time interval.
1109		d := time.Duration(minWait+rand.Intn(maxWait-minWait)) * time.Millisecond
1110		select {
1111		case <-time.After(d):
1112		case <-done:
1113			return // server has been closed
1114		}
1115
1116		// Do a GC pass over all tables.
1117		var tables []*table
1118		s.mu.Lock()
1119		for _, tbl := range s.tables {
1120			tables = append(tables, tbl)
1121		}
1122		s.mu.Unlock()
1123		for _, tbl := range tables {
1124			tbl.gc()
1125		}
1126	}
1127}
1128
1129type table struct {
1130	mu       sync.RWMutex
1131	counter  uint64                   // increment by 1 when a new family is created
1132	families map[string]*columnFamily // keyed by plain family name
1133	rows     *btree.BTree             // indexed by row key
1134}
1135
1136const btreeDegree = 16
1137
1138func newTable(ctr *btapb.CreateTableRequest) *table {
1139	fams := make(map[string]*columnFamily)
1140	c := uint64(0)
1141	if ctr.Table != nil {
1142		for id, cf := range ctr.Table.ColumnFamilies {
1143			fams[id] = &columnFamily{
1144				name:   ctr.Parent + "/columnFamilies/" + id,
1145				order:  c,
1146				gcRule: cf.GcRule,
1147			}
1148			c++
1149		}
1150	}
1151	return &table{
1152		families: fams,
1153		counter:  c,
1154		rows:     btree.New(btreeDegree),
1155	}
1156}
1157
1158func (t *table) validTimestamp(ts int64) bool {
1159	if ts < minValidMilliSeconds || ts > maxValidMilliSeconds {
1160		return false
1161	}
1162
1163	// Assume millisecond granularity is required.
1164	return ts%1000 == 0
1165}
1166
1167func (t *table) columnFamilies() map[string]*columnFamily {
1168	cp := make(map[string]*columnFamily)
1169	t.mu.RLock()
1170	for fam, cf := range t.families {
1171		cp[fam] = cf
1172	}
1173	t.mu.RUnlock()
1174	return cp
1175}
1176
1177func (t *table) mutableRow(key string) *row {
1178	bkey := btreeKey(key)
1179	// Try fast path first.
1180	t.mu.RLock()
1181	i := t.rows.Get(bkey)
1182	t.mu.RUnlock()
1183	if i != nil {
1184		return i.(*row)
1185	}
1186
1187	// We probably need to create the row.
1188	t.mu.Lock()
1189	defer t.mu.Unlock()
1190	i = t.rows.Get(bkey)
1191	if i != nil {
1192		return i.(*row)
1193	}
1194	r := newRow(key)
1195	t.rows.ReplaceOrInsert(r)
1196	return r
1197}
1198
1199func (t *table) gc() {
1200	// This method doesn't add or remove rows, so we only need a read lock for the table.
1201	t.mu.RLock()
1202	defer t.mu.RUnlock()
1203
1204	// Gather GC rules we'll apply.
1205	rules := make(map[string]*btapb.GcRule) // keyed by "fam"
1206	for fam, cf := range t.families {
1207		if cf.gcRule != nil {
1208			rules[fam] = cf.gcRule
1209		}
1210	}
1211	if len(rules) == 0 {
1212		return
1213	}
1214
1215	t.rows.Ascend(func(i btree.Item) bool {
1216		r := i.(*row)
1217		r.mu.Lock()
1218		r.gc(rules)
1219		r.mu.Unlock()
1220		return true
1221	})
1222}
1223
1224type byRowKey []*row
1225
1226func (b byRowKey) Len() int           { return len(b) }
1227func (b byRowKey) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
1228func (b byRowKey) Less(i, j int) bool { return b[i].key < b[j].key }
1229
1230type row struct {
1231	key string
1232
1233	mu       sync.Mutex
1234	families map[string]*family // keyed by family name
1235}
1236
1237func newRow(key string) *row {
1238	return &row{
1239		key:      key,
1240		families: make(map[string]*family),
1241	}
1242}
1243
1244// copy returns a copy of the row.
1245// Cell values are aliased.
1246// r.mu should be held.
1247func (r *row) copy() *row {
1248	nr := newRow(r.key)
1249	for _, fam := range r.families {
1250		nr.families[fam.name] = &family{
1251			name:     fam.name,
1252			order:    fam.order,
1253			colNames: fam.colNames,
1254			cells:    make(map[string][]cell),
1255		}
1256		for col, cs := range fam.cells {
1257			// Copy the []cell slice, but not the []byte inside each cell.
1258			nr.families[fam.name].cells[col] = append([]cell(nil), cs...)
1259		}
1260	}
1261	return nr
1262}
1263
1264// isEmpty returns true if a row doesn't contain any cell
1265func (r *row) isEmpty() bool {
1266	for _, fam := range r.families {
1267		for _, cs := range fam.cells {
1268			if len(cs) > 0 {
1269				return false
1270			}
1271		}
1272	}
1273	return true
1274}
1275
1276// sortedFamilies returns a column family set
1277// sorted in ascending creation order in a row.
1278func (r *row) sortedFamilies() []*family {
1279	var families []*family
1280	for _, fam := range r.families {
1281		families = append(families, fam)
1282	}
1283	sort.Sort(byCreationOrder(families))
1284	return families
1285}
1286
1287func (r *row) getOrCreateFamily(name string, order uint64) *family {
1288	if _, ok := r.families[name]; !ok {
1289		r.families[name] = &family{
1290			name:  name,
1291			order: order,
1292			cells: make(map[string][]cell),
1293		}
1294	}
1295	return r.families[name]
1296}
1297
1298// gc applies the given GC rules to the row.
1299// r.mu should be held.
1300func (r *row) gc(rules map[string]*btapb.GcRule) {
1301	for _, fam := range r.families {
1302		rule, ok := rules[fam.name]
1303		if !ok {
1304			continue
1305		}
1306		for col, cs := range fam.cells {
1307			r.families[fam.name].cells[col] = applyGC(cs, rule)
1308		}
1309	}
1310}
1311
1312// size returns the total size of all cell values in the row.
1313func (r *row) size() int {
1314	size := 0
1315	for _, fam := range r.families {
1316		for _, cells := range fam.cells {
1317			for _, cell := range cells {
1318				size += len(cell.value)
1319			}
1320		}
1321	}
1322	return size
1323}
1324
1325// Less implements btree.Less.
1326func (r *row) Less(i btree.Item) bool {
1327	return r.key < i.(*row).key
1328}
1329
1330// btreeKey returns a row for use as a key into the BTree.
1331func btreeKey(s string) *row { return &row{key: s} }
1332
1333func (r *row) String() string {
1334	return r.key
1335}
1336
1337var gcTypeWarn sync.Once
1338
1339// applyGC applies the given GC rule to the cells.
1340func applyGC(cells []cell, rule *btapb.GcRule) []cell {
1341	switch rule := rule.Rule.(type) {
1342	default:
1343		// TODO(dsymonds): Support GcRule_Intersection_
1344		gcTypeWarn.Do(func() {
1345			log.Printf("Unsupported GC rule type %T", rule)
1346		})
1347	case *btapb.GcRule_Union_:
1348		for _, sub := range rule.Union.Rules {
1349			cells = applyGC(cells, sub)
1350		}
1351		return cells
1352	case *btapb.GcRule_MaxAge:
1353		// Timestamps are in microseconds.
1354		cutoff := time.Now().UnixNano() / 1e3
1355		cutoff -= rule.MaxAge.Seconds * 1e6
1356		cutoff -= int64(rule.MaxAge.Nanos) / 1e3
1357		// The slice of cells in in descending timestamp order.
1358		// This sort.Search will return the index of the first cell whose timestamp is chronologically before the cutoff.
1359		si := sort.Search(len(cells), func(i int) bool { return cells[i].ts < cutoff })
1360		if si < len(cells) {
1361			log.Printf("bttest: GC MaxAge(%v) deleted %d cells.", rule.MaxAge, len(cells)-si)
1362		}
1363		return cells[:si]
1364	case *btapb.GcRule_MaxNumVersions:
1365		n := int(rule.MaxNumVersions)
1366		if len(cells) > n {
1367			cells = cells[:n]
1368		}
1369		return cells
1370	}
1371	return cells
1372}
1373
1374type family struct {
1375	name     string            // Column family name
1376	order    uint64            // Creation order of column family
1377	colNames []string          // Column names are sorted in lexicographical ascending order
1378	cells    map[string][]cell // Keyed by column name; cells are in descending timestamp order
1379}
1380
1381type byCreationOrder []*family
1382
1383func (b byCreationOrder) Len() int           { return len(b) }
1384func (b byCreationOrder) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
1385func (b byCreationOrder) Less(i, j int) bool { return b[i].order < b[j].order }
1386
1387// cellsByColumn adds the column name to colNames set if it does not exist
1388// and returns all cells within a column
1389func (f *family) cellsByColumn(name string) []cell {
1390	if _, ok := f.cells[name]; !ok {
1391		f.colNames = append(f.colNames, name)
1392		sort.Strings(f.colNames)
1393	}
1394	return f.cells[name]
1395}
1396
1397type cell struct {
1398	ts     int64
1399	value  []byte
1400	labels []string
1401}
1402
1403type byDescTS []cell
1404
1405func (b byDescTS) Len() int           { return len(b) }
1406func (b byDescTS) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
1407func (b byDescTS) Less(i, j int) bool { return b[i].ts > b[j].ts }
1408
1409type columnFamily struct {
1410	name   string
1411	order  uint64 // Creation order of column family
1412	gcRule *btapb.GcRule
1413}
1414
1415func (c *columnFamily) proto() *btapb.ColumnFamily {
1416	return &btapb.ColumnFamily{
1417		GcRule: c.gcRule,
1418	}
1419}
1420
1421func toColumnFamilies(families map[string]*columnFamily) map[string]*btapb.ColumnFamily {
1422	fs := make(map[string]*btapb.ColumnFamily)
1423	for k, v := range families {
1424		fs[k] = v.proto()
1425	}
1426	return fs
1427}
1428