1/*
2Copyright 2015 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17/*
18Package bttest contains test helpers for working with the bigtable package.
19
20To use a Server, create it, and then connect to it with no security:
21(The project/instance values are ignored.)
22	srv, err := bttest.NewServer("localhost:0")
23	...
24	conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
25	...
26	client, err := bigtable.NewClient(ctx, proj, instance,
27	        option.WithGRPCConn(conn))
28	...
29*/
30package bttest // import "cloud.google.com/go/bigtable/bttest"
31
32import (
33	"bytes"
34	"context"
35	"encoding/binary"
36	"fmt"
37	"log"
38	"math"
39	"math/rand"
40	"net"
41	"regexp"
42	"sort"
43	"strings"
44	"sync"
45	"time"
46
47	emptypb "github.com/golang/protobuf/ptypes/empty"
48	"github.com/golang/protobuf/ptypes/wrappers"
49	"github.com/google/btree"
50	btapb "google.golang.org/genproto/googleapis/bigtable/admin/v2"
51	btpb "google.golang.org/genproto/googleapis/bigtable/v2"
52	"google.golang.org/genproto/googleapis/longrunning"
53	statpb "google.golang.org/genproto/googleapis/rpc/status"
54	"google.golang.org/grpc"
55	"google.golang.org/grpc/codes"
56	"google.golang.org/grpc/status"
57	"rsc.io/binaryregexp"
58)
59
60const (
61	// MilliSeconds field of the minimum valid Timestamp.
62	minValidMilliSeconds = 0
63
64	// MilliSeconds field of the max valid Timestamp.
65	// Must match the max value of type TimestampMicros (int64)
66	// truncated to the millis granularity by subtracting a remainder of 1000.
67	maxValidMilliSeconds = math.MaxInt64 - math.MaxInt64%1000
68)
69
70var validLabelTransformer = regexp.MustCompile(`[a-z0-9\-]{1,15}`)
71
72// Server is an in-memory Cloud Bigtable fake.
73// It is unauthenticated, and only a rough approximation.
74type Server struct {
75	Addr string
76
77	l   net.Listener
78	srv *grpc.Server
79	s   *server
80}
81
82// server is the real implementation of the fake.
83// It is a separate and unexported type so the API won't be cluttered with
84// methods that are only relevant to the fake's implementation.
85type server struct {
86	mu        sync.Mutex
87	tables    map[string]*table          // keyed by fully qualified name
88	instances map[string]*btapb.Instance // keyed by fully qualified name
89	gcc       chan int                   // set when gcloop starts, closed when server shuts down
90
91	// Any unimplemented methods will cause a panic.
92	btapb.BigtableTableAdminServer
93	btapb.BigtableInstanceAdminServer
94	btpb.BigtableServer
95}
96
97// NewServer creates a new Server.
98// The Server will be listening for gRPC connections, without TLS,
99// on the provided address. The resolved address is named by the Addr field.
100func NewServer(laddr string, opt ...grpc.ServerOption) (*Server, error) {
101	l, err := net.Listen("tcp", laddr)
102	if err != nil {
103		return nil, err
104	}
105
106	s := &Server{
107		Addr: l.Addr().String(),
108		l:    l,
109		srv:  grpc.NewServer(opt...),
110		s: &server{
111			tables:    make(map[string]*table),
112			instances: make(map[string]*btapb.Instance),
113		},
114	}
115	btapb.RegisterBigtableInstanceAdminServer(s.srv, s.s)
116	btapb.RegisterBigtableTableAdminServer(s.srv, s.s)
117	btpb.RegisterBigtableServer(s.srv, s.s)
118
119	go s.srv.Serve(s.l)
120
121	return s, nil
122}
123
124// Close shuts down the server.
125func (s *Server) Close() {
126	s.s.mu.Lock()
127	if s.s.gcc != nil {
128		close(s.s.gcc)
129	}
130	s.s.mu.Unlock()
131
132	s.srv.Stop()
133	s.l.Close()
134}
135
136func (s *server) CreateTable(ctx context.Context, req *btapb.CreateTableRequest) (*btapb.Table, error) {
137	tbl := req.Parent + "/tables/" + req.TableId
138
139	s.mu.Lock()
140	if _, ok := s.tables[tbl]; ok {
141		s.mu.Unlock()
142		return nil, status.Errorf(codes.AlreadyExists, "table %q already exists", tbl)
143	}
144	s.tables[tbl] = newTable(req)
145	s.mu.Unlock()
146
147	ct := &btapb.Table{
148		Name:           tbl,
149		ColumnFamilies: req.GetTable().GetColumnFamilies(),
150		Granularity:    req.GetTable().GetGranularity(),
151	}
152	if ct.Granularity == 0 {
153		ct.Granularity = btapb.Table_MILLIS
154	}
155	return ct, nil
156}
157
158func (s *server) CreateTableFromSnapshot(context.Context, *btapb.CreateTableFromSnapshotRequest) (*longrunning.Operation, error) {
159	return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots")
160}
161
162func (s *server) ListTables(ctx context.Context, req *btapb.ListTablesRequest) (*btapb.ListTablesResponse, error) {
163	res := &btapb.ListTablesResponse{}
164	prefix := req.Parent + "/tables/"
165
166	s.mu.Lock()
167	for tbl := range s.tables {
168		if strings.HasPrefix(tbl, prefix) {
169			res.Tables = append(res.Tables, &btapb.Table{Name: tbl})
170		}
171	}
172	s.mu.Unlock()
173
174	return res, nil
175}
176
177func (s *server) GetTable(ctx context.Context, req *btapb.GetTableRequest) (*btapb.Table, error) {
178	tbl := req.Name
179
180	s.mu.Lock()
181	tblIns, ok := s.tables[tbl]
182	s.mu.Unlock()
183	if !ok {
184		return nil, status.Errorf(codes.NotFound, "table %q not found", tbl)
185	}
186
187	return &btapb.Table{
188		Name:           tbl,
189		ColumnFamilies: toColumnFamilies(tblIns.columnFamilies()),
190	}, nil
191}
192
193func (s *server) DeleteTable(ctx context.Context, req *btapb.DeleteTableRequest) (*emptypb.Empty, error) {
194	s.mu.Lock()
195	defer s.mu.Unlock()
196	if _, ok := s.tables[req.Name]; !ok {
197		return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name)
198	}
199	delete(s.tables, req.Name)
200	return &emptypb.Empty{}, nil
201}
202
203func (s *server) ModifyColumnFamilies(ctx context.Context, req *btapb.ModifyColumnFamiliesRequest) (*btapb.Table, error) {
204	s.mu.Lock()
205	tbl, ok := s.tables[req.Name]
206	s.mu.Unlock()
207	if !ok {
208		return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name)
209	}
210
211	tbl.mu.Lock()
212	defer tbl.mu.Unlock()
213
214	for _, mod := range req.Modifications {
215		if create := mod.GetCreate(); create != nil {
216			if _, ok := tbl.families[mod.Id]; ok {
217				return nil, status.Errorf(codes.AlreadyExists, "family %q already exists", mod.Id)
218			}
219			newcf := &columnFamily{
220				name:   req.Name + "/columnFamilies/" + mod.Id,
221				order:  tbl.counter,
222				gcRule: create.GcRule,
223			}
224			tbl.counter++
225			tbl.families[mod.Id] = newcf
226		} else if mod.GetDrop() {
227			if _, ok := tbl.families[mod.Id]; !ok {
228				return nil, fmt.Errorf("can't delete unknown family %q", mod.Id)
229			}
230			delete(tbl.families, mod.Id)
231
232			// Purge all data for this column family
233			tbl.rows.Ascend(func(i btree.Item) bool {
234				r := i.(*row)
235				r.mu.Lock()
236				defer r.mu.Unlock()
237				delete(r.families, mod.Id)
238				return true
239			})
240		} else if modify := mod.GetUpdate(); modify != nil {
241			if _, ok := tbl.families[mod.Id]; !ok {
242				return nil, fmt.Errorf("no such family %q", mod.Id)
243			}
244			newcf := &columnFamily{
245				name:   req.Name + "/columnFamilies/" + mod.Id,
246				gcRule: modify.GcRule,
247			}
248			// assume that we ALWAYS want to replace by the new setting
249			// we may need partial update through
250			tbl.families[mod.Id] = newcf
251		}
252	}
253
254	s.needGC()
255	return &btapb.Table{
256		Name:           req.Name,
257		ColumnFamilies: toColumnFamilies(tbl.families),
258		Granularity:    btapb.Table_TimestampGranularity(btapb.Table_MILLIS),
259	}, nil
260}
261
262func (s *server) DropRowRange(ctx context.Context, req *btapb.DropRowRangeRequest) (*emptypb.Empty, error) {
263	s.mu.Lock()
264	tbl, ok := s.tables[req.Name]
265	s.mu.Unlock()
266	if !ok {
267		return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name)
268	}
269
270	tbl.mu.Lock()
271	defer tbl.mu.Unlock()
272	if req.GetDeleteAllDataFromTable() {
273		tbl.rows = btree.New(btreeDegree)
274	} else {
275		// Delete rows by prefix.
276		prefixBytes := req.GetRowKeyPrefix()
277		if prefixBytes == nil {
278			return nil, fmt.Errorf("missing row key prefix")
279		}
280		prefix := string(prefixBytes)
281
282		// The BTree does not specify what happens if rows are deleted during
283		// iteration, and it provides no "delete range" method.
284		// So we collect the rows first, then delete them one by one.
285		var rowsToDelete []*row
286		tbl.rows.AscendGreaterOrEqual(btreeKey(prefix), func(i btree.Item) bool {
287			r := i.(*row)
288			if strings.HasPrefix(r.key, prefix) {
289				rowsToDelete = append(rowsToDelete, r)
290				return true
291			}
292			return false // stop iteration
293		})
294		for _, r := range rowsToDelete {
295			tbl.rows.Delete(r)
296		}
297	}
298	return &emptypb.Empty{}, nil
299}
300
301func (s *server) GenerateConsistencyToken(ctx context.Context, req *btapb.GenerateConsistencyTokenRequest) (*btapb.GenerateConsistencyTokenResponse, error) {
302	// Check that the table exists.
303	_, ok := s.tables[req.Name]
304	if !ok {
305		return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name)
306	}
307
308	return &btapb.GenerateConsistencyTokenResponse{
309		ConsistencyToken: "TokenFor-" + req.Name,
310	}, nil
311}
312
313func (s *server) CheckConsistency(ctx context.Context, req *btapb.CheckConsistencyRequest) (*btapb.CheckConsistencyResponse, error) {
314	// Check that the table exists.
315	_, ok := s.tables[req.Name]
316	if !ok {
317		return nil, status.Errorf(codes.NotFound, "table %q not found", req.Name)
318	}
319
320	// Check this is the right token.
321	if req.ConsistencyToken != "TokenFor-"+req.Name {
322		return nil, status.Errorf(codes.InvalidArgument, "token %q not valid", req.ConsistencyToken)
323	}
324
325	// Single cluster instances are always consistent.
326	return &btapb.CheckConsistencyResponse{
327		Consistent: true,
328	}, nil
329}
330
331func (s *server) SnapshotTable(context.Context, *btapb.SnapshotTableRequest) (*longrunning.Operation, error) {
332	return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots")
333}
334
335func (s *server) GetSnapshot(context.Context, *btapb.GetSnapshotRequest) (*btapb.Snapshot, error) {
336	return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots")
337}
338
339func (s *server) ListSnapshots(context.Context, *btapb.ListSnapshotsRequest) (*btapb.ListSnapshotsResponse, error) {
340	return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots")
341}
342
343func (s *server) DeleteSnapshot(context.Context, *btapb.DeleteSnapshotRequest) (*emptypb.Empty, error) {
344	return nil, status.Errorf(codes.Unimplemented, "the emulator does not currently support snapshots")
345}
346
347func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRowsServer) error {
348	s.mu.Lock()
349	tbl, ok := s.tables[req.TableName]
350	s.mu.Unlock()
351	if !ok {
352		return status.Errorf(codes.NotFound, "table %q not found", req.TableName)
353	}
354
355	if err := validateRowRanges(req); err != nil {
356		return err
357	}
358
359	// Rows to read can be specified by a set of row keys and/or a set of row ranges.
360	// Output is a stream of sorted, de-duped rows.
361	tbl.mu.RLock()
362	rowSet := make(map[string]*row)
363
364	addRow := func(i btree.Item) bool {
365		r := i.(*row)
366		rowSet[r.key] = r
367		return true
368	}
369
370	if req.Rows != nil &&
371		len(req.Rows.RowKeys)+len(req.Rows.RowRanges) > 0 {
372		// Add the explicitly given keys
373		for _, key := range req.Rows.RowKeys {
374			k := string(key)
375			if i := tbl.rows.Get(btreeKey(k)); i != nil {
376				addRow(i)
377			}
378		}
379
380		// Add keys from row ranges
381		for _, rr := range req.Rows.RowRanges {
382			var start, end string
383			switch sk := rr.StartKey.(type) {
384			case *btpb.RowRange_StartKeyClosed:
385				start = string(sk.StartKeyClosed)
386			case *btpb.RowRange_StartKeyOpen:
387				start = string(sk.StartKeyOpen) + "\x00"
388			}
389			switch ek := rr.EndKey.(type) {
390			case *btpb.RowRange_EndKeyClosed:
391				end = string(ek.EndKeyClosed) + "\x00"
392			case *btpb.RowRange_EndKeyOpen:
393				end = string(ek.EndKeyOpen)
394			}
395			switch {
396			case start == "" && end == "":
397				tbl.rows.Ascend(addRow) // all rows
398			case start == "":
399				tbl.rows.AscendLessThan(btreeKey(end), addRow)
400			case end == "":
401				tbl.rows.AscendGreaterOrEqual(btreeKey(start), addRow)
402			default:
403				tbl.rows.AscendRange(btreeKey(start), btreeKey(end), addRow)
404			}
405		}
406	} else {
407		// Read all rows
408		tbl.rows.Ascend(addRow)
409	}
410	tbl.mu.RUnlock()
411
412	rows := make([]*row, 0, len(rowSet))
413	for _, r := range rowSet {
414		r.mu.Lock()
415		fams := len(r.families)
416		r.mu.Unlock()
417
418		if fams != 0 {
419			rows = append(rows, r)
420		}
421	}
422	sort.Sort(byRowKey(rows))
423
424	limit := int(req.RowsLimit)
425	count := 0
426	for _, r := range rows {
427		if limit > 0 && count >= limit {
428			return nil
429		}
430		streamed, err := streamRow(stream, r, req.Filter)
431		if err != nil {
432			return err
433		}
434		if streamed {
435			count++
436		}
437	}
438	return nil
439}
440
441// streamRow filters the given row and sends it via the given stream.
442// Returns true if at least one cell matched the filter and was streamed, false otherwise.
443func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter) (bool, error) {
444	r.mu.Lock()
445	nr := r.copy()
446	r.mu.Unlock()
447	r = nr
448
449	match, err := filterRow(f, r)
450	if err != nil {
451		return false, err
452	}
453	if !match {
454		return false, nil
455	}
456
457	rrr := &btpb.ReadRowsResponse{}
458	families := r.sortedFamilies()
459	for _, fam := range families {
460		for _, colName := range fam.colNames {
461			cells := fam.cells[colName]
462			if len(cells) == 0 {
463				continue
464			}
465			for _, cell := range cells {
466				rrr.Chunks = append(rrr.Chunks, &btpb.ReadRowsResponse_CellChunk{
467					RowKey:          []byte(r.key),
468					FamilyName:      &wrappers.StringValue{Value: fam.name},
469					Qualifier:       &wrappers.BytesValue{Value: []byte(colName)},
470					TimestampMicros: cell.ts,
471					Value:           cell.value,
472					Labels:          cell.labels,
473				})
474			}
475		}
476	}
477	// We can't have a cell with just COMMIT set, which would imply a new empty cell.
478	// So modify the last cell to have the COMMIT flag set.
479	if len(rrr.Chunks) > 0 {
480		rrr.Chunks[len(rrr.Chunks)-1].RowStatus = &btpb.ReadRowsResponse_CellChunk_CommitRow{CommitRow: true}
481	}
482
483	return true, stream.Send(rrr)
484}
485
486// filterRow modifies a row with the given filter. Returns true if at least one cell from the row matches,
487// false otherwise. If a filter is invalid, filterRow returns false and an error.
488func filterRow(f *btpb.RowFilter, r *row) (bool, error) {
489	if f == nil {
490		return true, nil
491	}
492	// Handle filters that apply beyond just including/excluding cells.
493	switch f := f.Filter.(type) {
494	case *btpb.RowFilter_BlockAllFilter:
495		if !f.BlockAllFilter {
496			return false, status.Errorf(codes.InvalidArgument, "block_all_filter must be true if set")
497		}
498		return false, nil
499	case *btpb.RowFilter_PassAllFilter:
500		if !f.PassAllFilter {
501			return false, status.Errorf(codes.InvalidArgument, "pass_all_filter must be true if set")
502		}
503		return true, nil
504	case *btpb.RowFilter_Chain_:
505		if len(f.Chain.Filters) < 2 {
506			return false, status.Errorf(codes.InvalidArgument, "Chain must contain at least two RowFilters")
507		}
508		for _, sub := range f.Chain.Filters {
509			match, err := filterRow(sub, r)
510			if err != nil {
511				return false, err
512			}
513			if !match {
514				return false, nil
515			}
516		}
517		return true, nil
518	case *btpb.RowFilter_Interleave_:
519		if len(f.Interleave.Filters) < 2 {
520			return false, status.Errorf(codes.InvalidArgument, "Interleave must contain at least two RowFilters")
521		}
522		srs := make([]*row, 0, len(f.Interleave.Filters))
523		for _, sub := range f.Interleave.Filters {
524			sr := r.copy()
525			match, err := filterRow(sub, sr)
526			if err != nil {
527				return false, err
528			}
529			if match {
530				srs = append(srs, sr)
531			}
532		}
533		// merge
534		// TODO(dsymonds): is this correct?
535		r.families = make(map[string]*family)
536		for _, sr := range srs {
537			for _, fam := range sr.families {
538				f := r.getOrCreateFamily(fam.name, fam.order)
539				for colName, cs := range fam.cells {
540					f.cells[colName] = append(f.cellsByColumn(colName), cs...)
541				}
542			}
543		}
544		var count int
545		for _, fam := range r.families {
546			for _, cs := range fam.cells {
547				sort.Sort(byDescTS(cs))
548				count += len(cs)
549			}
550		}
551		return count > 0, nil
552	case *btpb.RowFilter_CellsPerColumnLimitFilter:
553		lim := int(f.CellsPerColumnLimitFilter)
554		for _, fam := range r.families {
555			for col, cs := range fam.cells {
556				if len(cs) > lim {
557					fam.cells[col] = cs[:lim]
558				}
559			}
560		}
561		return true, nil
562	case *btpb.RowFilter_Condition_:
563		match, err := filterRow(f.Condition.PredicateFilter, r.copy())
564		if err != nil {
565			return false, err
566		}
567		if match {
568			if f.Condition.TrueFilter == nil {
569				return false, nil
570			}
571			return filterRow(f.Condition.TrueFilter, r)
572		}
573		if f.Condition.FalseFilter == nil {
574			return false, nil
575		}
576		return filterRow(f.Condition.FalseFilter, r)
577	case *btpb.RowFilter_RowKeyRegexFilter:
578		rx, err := newRegexp(f.RowKeyRegexFilter)
579		if err != nil {
580			return false, status.Errorf(codes.InvalidArgument, "Error in field 'rowkey_regex_filter' : %v", err)
581		}
582		if !rx.MatchString(r.key) {
583			return false, nil
584		}
585	case *btpb.RowFilter_CellsPerRowLimitFilter:
586		// Grab the first n cells in the row.
587		lim := int(f.CellsPerRowLimitFilter)
588		for _, fam := range r.families {
589			for _, col := range fam.colNames {
590				cs := fam.cells[col]
591				if len(cs) > lim {
592					fam.cells[col] = cs[:lim]
593					lim = 0
594				} else {
595					lim -= len(cs)
596				}
597			}
598		}
599		return true, nil
600	case *btpb.RowFilter_CellsPerRowOffsetFilter:
601		// Skip the first n cells in the row.
602		offset := int(f.CellsPerRowOffsetFilter)
603		for _, fam := range r.families {
604			for _, col := range fam.colNames {
605				cs := fam.cells[col]
606				if len(cs) > offset {
607					fam.cells[col] = cs[offset:]
608					offset = 0
609					return true, nil
610				}
611				fam.cells[col] = cs[:0]
612				offset -= len(cs)
613			}
614		}
615		return true, nil
616	case *btpb.RowFilter_RowSampleFilter:
617		// The row sample filter "matches all cells from a row with probability
618		// p, and matches no cells from the row with probability 1-p."
619		// See https://github.com/googleapis/googleapis/blob/master/google/bigtable/v2/data.proto
620		if f.RowSampleFilter <= 0.0 || f.RowSampleFilter >= 1.0 {
621			return false, status.Error(codes.InvalidArgument, "row_sample_filter argument must be between 0.0 and 1.0")
622		}
623		return randFloat() < f.RowSampleFilter, nil
624	}
625
626	// Any other case, operate on a per-cell basis.
627	cellCount := 0
628	for _, fam := range r.families {
629		for colName, cs := range fam.cells {
630			filtered, err := filterCells(f, fam.name, colName, cs)
631			if err != nil {
632				return false, err
633			}
634			fam.cells[colName] = filtered
635			cellCount += len(fam.cells[colName])
636		}
637	}
638	return cellCount > 0, nil
639}
640
641var randFloat = rand.Float64
642
643func filterCells(f *btpb.RowFilter, fam, col string, cs []cell) ([]cell, error) {
644	var ret []cell
645	for _, cell := range cs {
646		include, err := includeCell(f, fam, col, cell)
647		if err != nil {
648			return nil, err
649		}
650		if include {
651			cell, err = modifyCell(f, cell)
652			if err != nil {
653				return nil, err
654			}
655			ret = append(ret, cell)
656		}
657	}
658	return ret, nil
659}
660
661func modifyCell(f *btpb.RowFilter, c cell) (cell, error) {
662	if f == nil {
663		return c, nil
664	}
665	// Consider filters that may modify the cell contents
666	switch filter := f.Filter.(type) {
667	case *btpb.RowFilter_StripValueTransformer:
668		return cell{ts: c.ts}, nil
669	case *btpb.RowFilter_ApplyLabelTransformer:
670		if !validLabelTransformer.MatchString(filter.ApplyLabelTransformer) {
671			return cell{}, status.Errorf(
672				codes.InvalidArgument,
673				`apply_label_transformer must match RE2([a-z0-9\-]+), but found %v`,
674				filter.ApplyLabelTransformer,
675			)
676		}
677		return cell{ts: c.ts, value: c.value, labels: []string{filter.ApplyLabelTransformer}}, nil
678	default:
679		return c, nil
680	}
681}
682
683func includeCell(f *btpb.RowFilter, fam, col string, cell cell) (bool, error) {
684	if f == nil {
685		return true, nil
686	}
687	// TODO(dsymonds): Implement many more filters.
688	switch f := f.Filter.(type) {
689	case *btpb.RowFilter_CellsPerColumnLimitFilter:
690		// Don't log, row-level filter
691		return true, nil
692	case *btpb.RowFilter_RowKeyRegexFilter:
693		// Don't log, row-level filter
694		return true, nil
695	case *btpb.RowFilter_StripValueTransformer:
696		// Don't log, cell-modifying filter
697		return true, nil
698	case *btpb.RowFilter_ApplyLabelTransformer:
699		// Don't log, cell-modifying filter
700		return true, nil
701	default:
702		log.Printf("WARNING: don't know how to handle filter of type %T (ignoring it)", f)
703		return true, nil
704	case *btpb.RowFilter_FamilyNameRegexFilter:
705		rx, err := newRegexp([]byte(f.FamilyNameRegexFilter))
706		if err != nil {
707			return false, status.Errorf(codes.InvalidArgument, "Error in field 'family_name_regex_filter' : %v", err)
708		}
709		return rx.MatchString(fam), nil
710	case *btpb.RowFilter_ColumnQualifierRegexFilter:
711		rx, err := newRegexp(f.ColumnQualifierRegexFilter)
712		if err != nil {
713			return false, status.Errorf(codes.InvalidArgument, "Error in field 'column_qualifier_regex_filter' : %v", err)
714		}
715		return rx.MatchString(col), nil
716	case *btpb.RowFilter_ValueRegexFilter:
717		rx, err := newRegexp(f.ValueRegexFilter)
718		if err != nil {
719			return false, status.Errorf(codes.InvalidArgument, "Error in field 'value_regex_filter' : %v", err)
720		}
721		return rx.Match(cell.value), nil
722	case *btpb.RowFilter_ColumnRangeFilter:
723		if fam != f.ColumnRangeFilter.FamilyName {
724			return false, nil
725		}
726		// Start qualifier defaults to empty string closed
727		inRangeStart := func() bool { return col >= "" }
728		switch sq := f.ColumnRangeFilter.StartQualifier.(type) {
729		case *btpb.ColumnRange_StartQualifierOpen:
730			inRangeStart = func() bool { return col > string(sq.StartQualifierOpen) }
731		case *btpb.ColumnRange_StartQualifierClosed:
732			inRangeStart = func() bool { return col >= string(sq.StartQualifierClosed) }
733		}
734		// End qualifier defaults to no upper boundary
735		inRangeEnd := func() bool { return true }
736		switch eq := f.ColumnRangeFilter.EndQualifier.(type) {
737		case *btpb.ColumnRange_EndQualifierClosed:
738			inRangeEnd = func() bool { return col <= string(eq.EndQualifierClosed) }
739		case *btpb.ColumnRange_EndQualifierOpen:
740			inRangeEnd = func() bool { return col < string(eq.EndQualifierOpen) }
741		}
742		return inRangeStart() && inRangeEnd(), nil
743	case *btpb.RowFilter_TimestampRangeFilter:
744		// Server should only support millisecond precision.
745		if f.TimestampRangeFilter.StartTimestampMicros%int64(time.Millisecond/time.Microsecond) != 0 || f.TimestampRangeFilter.EndTimestampMicros%int64(time.Millisecond/time.Microsecond) != 0 {
746			return false, status.Errorf(codes.InvalidArgument, "Error in field 'timestamp_range_filter'. Maximum precision allowed in filter is millisecond.\nGot:\nStart: %v\nEnd: %v", f.TimestampRangeFilter.StartTimestampMicros, f.TimestampRangeFilter.EndTimestampMicros)
747		}
748		// Lower bound is inclusive and defaults to 0, upper bound is exclusive and defaults to infinity.
749		return cell.ts >= f.TimestampRangeFilter.StartTimestampMicros &&
750			(f.TimestampRangeFilter.EndTimestampMicros == 0 || cell.ts < f.TimestampRangeFilter.EndTimestampMicros), nil
751	case *btpb.RowFilter_ValueRangeFilter:
752		v := cell.value
753		// Start value defaults to empty string closed
754		inRangeStart := func() bool { return bytes.Compare(v, []byte{}) >= 0 }
755		switch sv := f.ValueRangeFilter.StartValue.(type) {
756		case *btpb.ValueRange_StartValueOpen:
757			inRangeStart = func() bool { return bytes.Compare(v, sv.StartValueOpen) > 0 }
758		case *btpb.ValueRange_StartValueClosed:
759			inRangeStart = func() bool { return bytes.Compare(v, sv.StartValueClosed) >= 0 }
760		}
761		// End value defaults to no upper boundary
762		inRangeEnd := func() bool { return true }
763		switch ev := f.ValueRangeFilter.EndValue.(type) {
764		case *btpb.ValueRange_EndValueClosed:
765			inRangeEnd = func() bool { return bytes.Compare(v, ev.EndValueClosed) <= 0 }
766		case *btpb.ValueRange_EndValueOpen:
767			inRangeEnd = func() bool { return bytes.Compare(v, ev.EndValueOpen) < 0 }
768		}
769		return inRangeStart() && inRangeEnd(), nil
770	}
771}
772
773// escapeUTF is used to escape non-ASCII characters in pattern strings passed
774// to binaryregexp. This makes regexp column and row key matching work more
775// closely to what's seen with the real BigTable.
776func escapeUTF(in []byte) []byte {
777	var toEsc int
778	for _, c := range in {
779		if c > 127 {
780			toEsc++
781		}
782	}
783	if toEsc == 0 {
784		return in
785	}
786	// Each escaped byte becomes 4 bytes (byte a1 becomes \xA1)
787	out := make([]byte, 0, len(in)+3*toEsc)
788	for _, c := range in {
789		if c > 127 {
790			h, l := c>>4, c&0xF
791			const conv = "0123456789ABCDEF"
792			out = append(out, '\\', 'x', conv[h], conv[l])
793		} else {
794			out = append(out, c)
795		}
796	}
797	return out
798}
799
800func newRegexp(pat []byte) (*binaryregexp.Regexp, error) {
801	re, err := binaryregexp.Compile("^(?:" + string(escapeUTF(pat)) + ")$") // match entire target
802	if err != nil {
803		log.Printf("Bad pattern %q: %v", pat, err)
804	}
805	return re, err
806}
807
808func (s *server) MutateRow(ctx context.Context, req *btpb.MutateRowRequest) (*btpb.MutateRowResponse, error) {
809	s.mu.Lock()
810	tbl, ok := s.tables[req.TableName]
811	s.mu.Unlock()
812	if !ok {
813		return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName)
814	}
815	fs := tbl.columnFamilies()
816	r := tbl.mutableRow(string(req.RowKey))
817	r.mu.Lock()
818	defer r.mu.Unlock()
819	if err := applyMutations(tbl, r, req.Mutations, fs); err != nil {
820		return nil, err
821	}
822	return &btpb.MutateRowResponse{}, nil
823}
824
825func (s *server) MutateRows(req *btpb.MutateRowsRequest, stream btpb.Bigtable_MutateRowsServer) error {
826	s.mu.Lock()
827	tbl, ok := s.tables[req.TableName]
828	s.mu.Unlock()
829	if !ok {
830		return status.Errorf(codes.NotFound, "table %q not found", req.TableName)
831	}
832	res := &btpb.MutateRowsResponse{Entries: make([]*btpb.MutateRowsResponse_Entry, len(req.Entries))}
833
834	fs := tbl.columnFamilies()
835
836	for i, entry := range req.Entries {
837		r := tbl.mutableRow(string(entry.RowKey))
838		r.mu.Lock()
839		code, msg := int32(codes.OK), ""
840		if err := applyMutations(tbl, r, entry.Mutations, fs); err != nil {
841			code = int32(codes.Internal)
842			msg = err.Error()
843		}
844		res.Entries[i] = &btpb.MutateRowsResponse_Entry{
845			Index:  int64(i),
846			Status: &statpb.Status{Code: code, Message: msg},
847		}
848		r.mu.Unlock()
849	}
850	return stream.Send(res)
851}
852
853func (s *server) CheckAndMutateRow(ctx context.Context, req *btpb.CheckAndMutateRowRequest) (*btpb.CheckAndMutateRowResponse, error) {
854	s.mu.Lock()
855	tbl, ok := s.tables[req.TableName]
856	s.mu.Unlock()
857	if !ok {
858		return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName)
859	}
860	res := &btpb.CheckAndMutateRowResponse{}
861
862	fs := tbl.columnFamilies()
863
864	r := tbl.mutableRow(string(req.RowKey))
865	r.mu.Lock()
866	defer r.mu.Unlock()
867
868	// Figure out which mutation to apply.
869	whichMut := false
870	if req.PredicateFilter == nil {
871		// Use true_mutations iff row contains any cells.
872		whichMut = !r.isEmpty()
873	} else {
874		// Use true_mutations iff any cells in the row match the filter.
875		// TODO(dsymonds): This could be cheaper.
876		nr := r.copy()
877
878		match, err := filterRow(req.PredicateFilter, nr)
879		if err != nil {
880			return nil, err
881		}
882		whichMut = match && !nr.isEmpty()
883	}
884	res.PredicateMatched = whichMut
885	muts := req.FalseMutations
886	if whichMut {
887		muts = req.TrueMutations
888	}
889
890	if err := applyMutations(tbl, r, muts, fs); err != nil {
891		return nil, err
892	}
893	return res, nil
894}
895
896// applyMutations applies a sequence of mutations to a row.
897// fam should be a snapshot of the keys of tbl.families.
898// It assumes r.mu is locked.
899func applyMutations(tbl *table, r *row, muts []*btpb.Mutation, fs map[string]*columnFamily) error {
900	for _, mut := range muts {
901		switch mut := mut.Mutation.(type) {
902		default:
903			return fmt.Errorf("can't handle mutation type %T", mut)
904		case *btpb.Mutation_SetCell_:
905			set := mut.SetCell
906			if _, ok := fs[set.FamilyName]; !ok {
907				return fmt.Errorf("unknown family %q", set.FamilyName)
908			}
909			ts := set.TimestampMicros
910			if ts == -1 { // bigtable.ServerTime
911				ts = newTimestamp()
912			}
913			if !tbl.validTimestamp(ts) {
914				return fmt.Errorf("invalid timestamp %d", ts)
915			}
916			fam := set.FamilyName
917			col := string(set.ColumnQualifier)
918
919			newCell := cell{ts: ts, value: set.Value}
920			f := r.getOrCreateFamily(fam, fs[fam].order)
921			f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell)
922		case *btpb.Mutation_DeleteFromColumn_:
923			del := mut.DeleteFromColumn
924			if _, ok := fs[del.FamilyName]; !ok {
925				return fmt.Errorf("unknown family %q", del.FamilyName)
926			}
927			fam := del.FamilyName
928			col := string(del.ColumnQualifier)
929			if _, ok := r.families[fam]; ok {
930				cs := r.families[fam].cells[col]
931				if del.TimeRange != nil {
932					tsr := del.TimeRange
933					if !tbl.validTimestamp(tsr.StartTimestampMicros) {
934						return fmt.Errorf("invalid timestamp %d", tsr.StartTimestampMicros)
935					}
936					if !tbl.validTimestamp(tsr.EndTimestampMicros) && tsr.EndTimestampMicros != 0 {
937						return fmt.Errorf("invalid timestamp %d", tsr.EndTimestampMicros)
938					}
939					if tsr.StartTimestampMicros >= tsr.EndTimestampMicros && tsr.EndTimestampMicros != 0 {
940						return fmt.Errorf("inverted or invalid timestamp range [%d, %d]", tsr.StartTimestampMicros, tsr.EndTimestampMicros)
941					}
942
943					// Find half-open interval to remove.
944					// Cells are in descending timestamp order,
945					// so the predicates to sort.Search are inverted.
946					si, ei := 0, len(cs)
947					if tsr.StartTimestampMicros > 0 {
948						ei = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.StartTimestampMicros })
949					}
950					if tsr.EndTimestampMicros > 0 {
951						si = sort.Search(len(cs), func(i int) bool { return cs[i].ts < tsr.EndTimestampMicros })
952					}
953					if si < ei {
954						copy(cs[si:], cs[ei:])
955						cs = cs[:len(cs)-(ei-si)]
956					}
957				} else {
958					cs = nil
959				}
960				if len(cs) == 0 {
961					delete(r.families[fam].cells, col)
962					colNames := r.families[fam].colNames
963					i := sort.Search(len(colNames), func(i int) bool { return colNames[i] >= col })
964					if i < len(colNames) && colNames[i] == col {
965						r.families[fam].colNames = append(colNames[:i], colNames[i+1:]...)
966					}
967					if len(r.families[fam].cells) == 0 {
968						delete(r.families, fam)
969					}
970				} else {
971					r.families[fam].cells[col] = cs
972				}
973			}
974		case *btpb.Mutation_DeleteFromRow_:
975			r.families = make(map[string]*family)
976		case *btpb.Mutation_DeleteFromFamily_:
977			fampre := mut.DeleteFromFamily.FamilyName
978			delete(r.families, fampre)
979		}
980	}
981	return nil
982}
983
984func maxTimestamp(x, y int64) int64 {
985	if x > y {
986		return x
987	}
988	return y
989}
990
991func newTimestamp() int64 {
992	ts := time.Now().UnixNano() / 1e3
993	ts -= ts % 1000 // round to millisecond granularity
994	return ts
995}
996
997func appendOrReplaceCell(cs []cell, newCell cell) []cell {
998	replaced := false
999	for i, cell := range cs {
1000		if cell.ts == newCell.ts {
1001			cs[i] = newCell
1002			replaced = true
1003			break
1004		}
1005	}
1006	if !replaced {
1007		cs = append(cs, newCell)
1008	}
1009	sort.Sort(byDescTS(cs))
1010	return cs
1011}
1012
1013func (s *server) ReadModifyWriteRow(ctx context.Context, req *btpb.ReadModifyWriteRowRequest) (*btpb.ReadModifyWriteRowResponse, error) {
1014	s.mu.Lock()
1015	tbl, ok := s.tables[req.TableName]
1016	s.mu.Unlock()
1017	if !ok {
1018		return nil, status.Errorf(codes.NotFound, "table %q not found", req.TableName)
1019	}
1020
1021	fs := tbl.columnFamilies()
1022
1023	rowKey := string(req.RowKey)
1024	r := tbl.mutableRow(rowKey)
1025	resultRow := newRow(rowKey) // copy of updated cells
1026
1027	// This must be done before the row lock, acquired below, is released.
1028	r.mu.Lock()
1029	defer r.mu.Unlock()
1030	// Assume all mutations apply to the most recent version of the cell.
1031	// TODO(dsymonds): Verify this assumption and document it in the proto.
1032	for _, rule := range req.Rules {
1033		if _, ok := fs[rule.FamilyName]; !ok {
1034			return nil, fmt.Errorf("unknown family %q", rule.FamilyName)
1035		}
1036
1037		fam := rule.FamilyName
1038		col := string(rule.ColumnQualifier)
1039		isEmpty := false
1040		f := r.getOrCreateFamily(fam, fs[fam].order)
1041		cs := f.cells[col]
1042		isEmpty = len(cs) == 0
1043
1044		ts := newTimestamp()
1045		var newCell, prevCell cell
1046		if !isEmpty {
1047			cells := r.families[fam].cells[col]
1048			prevCell = cells[0]
1049
1050			// ts is the max of now or the prev cell's timestamp in case the
1051			// prev cell is in the future
1052			ts = maxTimestamp(ts, prevCell.ts)
1053		}
1054
1055		switch rule := rule.Rule.(type) {
1056		default:
1057			return nil, fmt.Errorf("unknown RMW rule oneof %T", rule)
1058		case *btpb.ReadModifyWriteRule_AppendValue:
1059			newCell = cell{ts: ts, value: append(prevCell.value, rule.AppendValue...)}
1060		case *btpb.ReadModifyWriteRule_IncrementAmount:
1061			var v int64
1062			if !isEmpty {
1063				prevVal := prevCell.value
1064				if len(prevVal) != 8 {
1065					return nil, fmt.Errorf("increment on non-64-bit value")
1066				}
1067				v = int64(binary.BigEndian.Uint64(prevVal))
1068			}
1069			v += rule.IncrementAmount
1070			var val [8]byte
1071			binary.BigEndian.PutUint64(val[:], uint64(v))
1072			newCell = cell{ts: ts, value: val[:]}
1073		}
1074
1075		// Store the new cell
1076		f.cells[col] = appendOrReplaceCell(f.cellsByColumn(col), newCell)
1077
1078		// Store a copy for the result row
1079		resultFamily := resultRow.getOrCreateFamily(fam, fs[fam].order)
1080		resultFamily.cellsByColumn(col)           // create the column
1081		resultFamily.cells[col] = []cell{newCell} // overwrite the cells
1082	}
1083
1084	// Build the response using the result row
1085	res := &btpb.Row{
1086		Key:      req.RowKey,
1087		Families: make([]*btpb.Family, len(resultRow.families)),
1088	}
1089
1090	for i, family := range resultRow.sortedFamilies() {
1091		res.Families[i] = &btpb.Family{
1092			Name:    family.name,
1093			Columns: make([]*btpb.Column, len(family.colNames)),
1094		}
1095
1096		for j, colName := range family.colNames {
1097			res.Families[i].Columns[j] = &btpb.Column{
1098				Qualifier: []byte(colName),
1099				Cells: []*btpb.Cell{{
1100					TimestampMicros: family.cells[colName][0].ts,
1101					Value:           family.cells[colName][0].value,
1102				}},
1103			}
1104		}
1105	}
1106	return &btpb.ReadModifyWriteRowResponse{Row: res}, nil
1107}
1108
1109func (s *server) SampleRowKeys(req *btpb.SampleRowKeysRequest, stream btpb.Bigtable_SampleRowKeysServer) error {
1110	s.mu.Lock()
1111	tbl, ok := s.tables[req.TableName]
1112	s.mu.Unlock()
1113	if !ok {
1114		return status.Errorf(codes.NotFound, "table %q not found", req.TableName)
1115	}
1116
1117	tbl.mu.RLock()
1118	defer tbl.mu.RUnlock()
1119
1120	// The return value of SampleRowKeys is very loosely defined. Return at least the
1121	// final row key in the table and choose other row keys randomly.
1122	var offset int64
1123	var err error
1124	i := 0
1125	tbl.rows.Ascend(func(it btree.Item) bool {
1126		row := it.(*row)
1127		if i == tbl.rows.Len()-1 || rand.Int31n(100) == 0 {
1128			resp := &btpb.SampleRowKeysResponse{
1129				RowKey:      []byte(row.key),
1130				OffsetBytes: offset,
1131			}
1132			err = stream.Send(resp)
1133			if err != nil {
1134				return false
1135			}
1136		}
1137		offset += int64(row.size())
1138		i++
1139		return true
1140	})
1141	return err
1142}
1143
1144// needGC is invoked whenever the server needs gcloop running.
1145func (s *server) needGC() {
1146	s.mu.Lock()
1147	if s.gcc == nil {
1148		s.gcc = make(chan int)
1149		go s.gcloop(s.gcc)
1150	}
1151	s.mu.Unlock()
1152}
1153
1154func (s *server) gcloop(done <-chan int) {
1155	const (
1156		minWait = 500  // ms
1157		maxWait = 1500 // ms
1158	)
1159
1160	for {
1161		// Wait for a random time interval.
1162		d := time.Duration(minWait+rand.Intn(maxWait-minWait)) * time.Millisecond
1163		select {
1164		case <-time.After(d):
1165		case <-done:
1166			return // server has been closed
1167		}
1168
1169		// Do a GC pass over all tables.
1170		var tables []*table
1171		s.mu.Lock()
1172		for _, tbl := range s.tables {
1173			tables = append(tables, tbl)
1174		}
1175		s.mu.Unlock()
1176		for _, tbl := range tables {
1177			tbl.gc()
1178		}
1179	}
1180}
1181
1182type table struct {
1183	mu       sync.RWMutex
1184	counter  uint64                   // increment by 1 when a new family is created
1185	families map[string]*columnFamily // keyed by plain family name
1186	rows     *btree.BTree             // indexed by row key
1187}
1188
1189const btreeDegree = 16
1190
1191func newTable(ctr *btapb.CreateTableRequest) *table {
1192	fams := make(map[string]*columnFamily)
1193	c := uint64(0)
1194	if ctr.Table != nil {
1195		for id, cf := range ctr.Table.ColumnFamilies {
1196			fams[id] = &columnFamily{
1197				name:   ctr.Parent + "/columnFamilies/" + id,
1198				order:  c,
1199				gcRule: cf.GcRule,
1200			}
1201			c++
1202		}
1203	}
1204	return &table{
1205		families: fams,
1206		counter:  c,
1207		rows:     btree.New(btreeDegree),
1208	}
1209}
1210
1211func (t *table) validTimestamp(ts int64) bool {
1212	if ts < minValidMilliSeconds || ts > maxValidMilliSeconds {
1213		return false
1214	}
1215
1216	// Assume millisecond granularity is required.
1217	return ts%1000 == 0
1218}
1219
1220func (t *table) columnFamilies() map[string]*columnFamily {
1221	cp := make(map[string]*columnFamily)
1222	t.mu.RLock()
1223	for fam, cf := range t.families {
1224		cp[fam] = cf
1225	}
1226	t.mu.RUnlock()
1227	return cp
1228}
1229
1230func (t *table) mutableRow(key string) *row {
1231	bkey := btreeKey(key)
1232	// Try fast path first.
1233	t.mu.RLock()
1234	i := t.rows.Get(bkey)
1235	t.mu.RUnlock()
1236	if i != nil {
1237		return i.(*row)
1238	}
1239
1240	// We probably need to create the row.
1241	t.mu.Lock()
1242	defer t.mu.Unlock()
1243	i = t.rows.Get(bkey)
1244	if i != nil {
1245		return i.(*row)
1246	}
1247	r := newRow(key)
1248	t.rows.ReplaceOrInsert(r)
1249	return r
1250}
1251
1252func (t *table) gc() {
1253	// This method doesn't add or remove rows, so we only need a read lock for the table.
1254	t.mu.RLock()
1255	defer t.mu.RUnlock()
1256
1257	// Gather GC rules we'll apply.
1258	rules := make(map[string]*btapb.GcRule) // keyed by "fam"
1259	for fam, cf := range t.families {
1260		if cf.gcRule != nil {
1261			rules[fam] = cf.gcRule
1262		}
1263	}
1264	if len(rules) == 0 {
1265		return
1266	}
1267
1268	t.rows.Ascend(func(i btree.Item) bool {
1269		r := i.(*row)
1270		r.mu.Lock()
1271		r.gc(rules)
1272		r.mu.Unlock()
1273		return true
1274	})
1275}
1276
1277type byRowKey []*row
1278
1279func (b byRowKey) Len() int           { return len(b) }
1280func (b byRowKey) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
1281func (b byRowKey) Less(i, j int) bool { return b[i].key < b[j].key }
1282
1283type row struct {
1284	key string
1285
1286	mu       sync.Mutex
1287	families map[string]*family // keyed by family name
1288}
1289
1290func newRow(key string) *row {
1291	return &row{
1292		key:      key,
1293		families: make(map[string]*family),
1294	}
1295}
1296
1297// copy returns a copy of the row.
1298// Cell values are aliased.
1299// r.mu should be held.
1300func (r *row) copy() *row {
1301	nr := newRow(r.key)
1302	for _, fam := range r.families {
1303		nr.families[fam.name] = &family{
1304			name:     fam.name,
1305			order:    fam.order,
1306			colNames: fam.colNames,
1307			cells:    make(map[string][]cell),
1308		}
1309		for col, cs := range fam.cells {
1310			// Copy the []cell slice, but not the []byte inside each cell.
1311			nr.families[fam.name].cells[col] = append([]cell(nil), cs...)
1312		}
1313	}
1314	return nr
1315}
1316
1317// isEmpty returns true if a row doesn't contain any cell
1318func (r *row) isEmpty() bool {
1319	for _, fam := range r.families {
1320		for _, cs := range fam.cells {
1321			if len(cs) > 0 {
1322				return false
1323			}
1324		}
1325	}
1326	return true
1327}
1328
1329// sortedFamilies returns a column family set
1330// sorted in ascending creation order in a row.
1331func (r *row) sortedFamilies() []*family {
1332	var families []*family
1333	for _, fam := range r.families {
1334		families = append(families, fam)
1335	}
1336	sort.Sort(byCreationOrder(families))
1337	return families
1338}
1339
1340func (r *row) getOrCreateFamily(name string, order uint64) *family {
1341	if _, ok := r.families[name]; !ok {
1342		r.families[name] = &family{
1343			name:  name,
1344			order: order,
1345			cells: make(map[string][]cell),
1346		}
1347	}
1348	return r.families[name]
1349}
1350
1351// gc applies the given GC rules to the row.
1352// r.mu should be held.
1353func (r *row) gc(rules map[string]*btapb.GcRule) {
1354	for _, fam := range r.families {
1355		rule, ok := rules[fam.name]
1356		if !ok {
1357			continue
1358		}
1359		for col, cs := range fam.cells {
1360			r.families[fam.name].cells[col] = applyGC(cs, rule)
1361		}
1362	}
1363}
1364
1365// size returns the total size of all cell values in the row.
1366func (r *row) size() int {
1367	size := 0
1368	for _, fam := range r.families {
1369		for _, cells := range fam.cells {
1370			for _, cell := range cells {
1371				size += len(cell.value)
1372			}
1373		}
1374	}
1375	return size
1376}
1377
1378// Less implements btree.Less.
1379func (r *row) Less(i btree.Item) bool {
1380	return r.key < i.(*row).key
1381}
1382
1383// btreeKey returns a row for use as a key into the BTree.
1384func btreeKey(s string) *row { return &row{key: s} }
1385
1386func (r *row) String() string {
1387	return r.key
1388}
1389
1390var gcTypeWarn sync.Once
1391
1392// applyGC applies the given GC rule to the cells.
1393func applyGC(cells []cell, rule *btapb.GcRule) []cell {
1394	switch rule := rule.Rule.(type) {
1395	default:
1396		// TODO(dsymonds): Support GcRule_Intersection_
1397		gcTypeWarn.Do(func() {
1398			log.Printf("Unsupported GC rule type %T", rule)
1399		})
1400	case *btapb.GcRule_Union_:
1401		for _, sub := range rule.Union.Rules {
1402			cells = applyGC(cells, sub)
1403		}
1404		return cells
1405	case *btapb.GcRule_MaxAge:
1406		// Timestamps are in microseconds.
1407		cutoff := time.Now().UnixNano() / 1e3
1408		cutoff -= rule.MaxAge.Seconds * 1e6
1409		cutoff -= int64(rule.MaxAge.Nanos) / 1e3
1410		// The slice of cells in in descending timestamp order.
1411		// This sort.Search will return the index of the first cell whose timestamp is chronologically before the cutoff.
1412		si := sort.Search(len(cells), func(i int) bool { return cells[i].ts < cutoff })
1413		if si < len(cells) {
1414			log.Printf("bttest: GC MaxAge(%v) deleted %d cells.", rule.MaxAge, len(cells)-si)
1415		}
1416		return cells[:si]
1417	case *btapb.GcRule_MaxNumVersions:
1418		n := int(rule.MaxNumVersions)
1419		if len(cells) > n {
1420			cells = cells[:n]
1421		}
1422		return cells
1423	}
1424	return cells
1425}
1426
1427type family struct {
1428	name     string            // Column family name
1429	order    uint64            // Creation order of column family
1430	colNames []string          // Column names are sorted in lexicographical ascending order
1431	cells    map[string][]cell // Keyed by column name; cells are in descending timestamp order
1432}
1433
1434type byCreationOrder []*family
1435
1436func (b byCreationOrder) Len() int           { return len(b) }
1437func (b byCreationOrder) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
1438func (b byCreationOrder) Less(i, j int) bool { return b[i].order < b[j].order }
1439
1440// cellsByColumn adds the column name to colNames set if it does not exist
1441// and returns all cells within a column
1442func (f *family) cellsByColumn(name string) []cell {
1443	if _, ok := f.cells[name]; !ok {
1444		f.colNames = append(f.colNames, name)
1445		sort.Strings(f.colNames)
1446	}
1447	return f.cells[name]
1448}
1449
1450type cell struct {
1451	ts     int64
1452	value  []byte
1453	labels []string
1454}
1455
1456type byDescTS []cell
1457
1458func (b byDescTS) Len() int           { return len(b) }
1459func (b byDescTS) Swap(i, j int)      { b[i], b[j] = b[j], b[i] }
1460func (b byDescTS) Less(i, j int) bool { return b[i].ts > b[j].ts }
1461
1462type columnFamily struct {
1463	name   string
1464	order  uint64 // Creation order of column family
1465	gcRule *btapb.GcRule
1466}
1467
1468func (c *columnFamily) proto() *btapb.ColumnFamily {
1469	return &btapb.ColumnFamily{
1470		GcRule: c.gcRule,
1471	}
1472}
1473
1474func toColumnFamilies(families map[string]*columnFamily) map[string]*btapb.ColumnFamily {
1475	fs := make(map[string]*btapb.ColumnFamily)
1476	for k, v := range families {
1477		fs[k] = v.proto()
1478	}
1479	return fs
1480}
1481