1/*
2Copyright 2015 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable // import "cloud.google.com/go/bigtable"
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"io"
24	"strconv"
25	"time"
26
27	btopt "cloud.google.com/go/bigtable/internal/option"
28	"cloud.google.com/go/internal/trace"
29	"github.com/golang/protobuf/proto"
30	gax "github.com/googleapis/gax-go/v2"
31	"google.golang.org/api/option"
32	gtransport "google.golang.org/api/transport/grpc"
33	btpb "google.golang.org/genproto/googleapis/bigtable/v2"
34	"google.golang.org/grpc"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/metadata"
37	"google.golang.org/grpc/status"
38)
39
40const prodAddr = "bigtable.googleapis.com:443"
41
42// Client is a client for reading and writing data to tables in an instance.
43//
44// A Client is safe to use concurrently, except for its Close method.
45type Client struct {
46	connPool          gtransport.ConnPool
47	client            btpb.BigtableClient
48	project, instance string
49	appProfile        string
50}
51
52// ClientConfig has configurations for the client.
53type ClientConfig struct {
54	// The id of the app profile to associate with all data operations sent from this client.
55	// If unspecified, the default app profile for the instance will be used.
56	AppProfile string
57}
58
59// NewClient creates a new Client for a given project and instance.
60// The default ClientConfig will be used.
61func NewClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*Client, error) {
62	return NewClientWithConfig(ctx, project, instance, ClientConfig{}, opts...)
63}
64
65// NewClientWithConfig creates a new client with the given config.
66func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) {
67	o, err := btopt.DefaultClientOptions(prodAddr, Scope, clientUserAgent)
68	if err != nil {
69		return nil, err
70	}
71	// Add gRPC client interceptors to supply Google client information. No external interceptors are passed.
72	o = append(o, btopt.ClientInterceptorOptions(nil, nil)...)
73
74	// Default to a small connection pool that can be overridden.
75	o = append(o,
76		option.WithGRPCConnectionPool(4),
77		// Set the max size to correspond to server-side limits.
78		option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(1<<28), grpc.MaxCallRecvMsgSize(1<<28))),
79		// TODO(grpc/grpc-go#1388) using connection pool without WithBlock
80		// can cause RPCs to fail randomly. We can delete this after the issue is fixed.
81		option.WithGRPCDialOption(grpc.WithBlock()))
82	o = append(o, opts...)
83	connPool, err := gtransport.DialPool(ctx, o...)
84	if err != nil {
85		return nil, fmt.Errorf("dialing: %v", err)
86	}
87
88	return &Client{
89		connPool:   connPool,
90		client:     btpb.NewBigtableClient(connPool),
91		project:    project,
92		instance:   instance,
93		appProfile: config.AppProfile,
94	}, nil
95}
96
97// Close closes the Client.
98func (c *Client) Close() error {
99	return c.connPool.Close()
100}
101
102var (
103	idempotentRetryCodes  = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted}
104	isIdempotentRetryCode = make(map[codes.Code]bool)
105	retryOptions          = []gax.CallOption{
106		gax.WithRetry(func() gax.Retryer {
107			return gax.OnCodes(idempotentRetryCodes, gax.Backoff{
108				Initial:    100 * time.Millisecond,
109				Max:        2 * time.Second,
110				Multiplier: 1.2,
111			})
112		}),
113	}
114)
115
116func init() {
117	for _, code := range idempotentRetryCodes {
118		isIdempotentRetryCode[code] = true
119	}
120}
121
122func (c *Client) fullTableName(table string) string {
123	return fmt.Sprintf("projects/%s/instances/%s/tables/%s", c.project, c.instance, table)
124}
125
126// mergeOutgoingMetadata returns a context populated by the existing outgoing
127// metadata merged with the provided mds.
128func mergeOutgoingMetadata(ctx context.Context, mds ...metadata.MD) context.Context {
129	ctxMD, _ := metadata.FromOutgoingContext(ctx)
130	// The ordering matters, hence why ctxMD comes first.
131	allMDs := append([]metadata.MD{ctxMD}, mds...)
132	return metadata.NewOutgoingContext(ctx, metadata.Join(allMDs...))
133}
134
135// A Table refers to a table.
136//
137// A Table is safe to use concurrently.
138type Table struct {
139	c     *Client
140	table string
141
142	// Metadata to be sent with each request.
143	md metadata.MD
144}
145
146// Open opens a table.
147func (c *Client) Open(table string) *Table {
148	return &Table{
149		c:     c,
150		table: table,
151		md:    metadata.Pairs(resourcePrefixHeader, c.fullTableName(table)),
152	}
153}
154
155// TODO(dsymonds): Read method that returns a sequence of ReadItems.
156
157// ReadRows reads rows from a table. f is called for each row.
158// If f returns false, the stream is shut down and ReadRows returns.
159// f owns its argument, and f is called serially in order by row key.
160//
161// By default, the yielded rows will contain all values in all cells.
162// Use RowFilter to limit the cells returned.
163func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) (err error) {
164	ctx = mergeOutgoingMetadata(ctx, t.md)
165	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.ReadRows")
166	defer func() { trace.EndSpan(ctx, err) }()
167
168	var prevRowKey string
169	attrMap := make(map[string]interface{})
170	err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
171		if !arg.valid() {
172			// Empty row set, no need to make an API call.
173			// NOTE: we must return early if arg == RowList{} because reading
174			// an empty RowList from bigtable returns all rows from that table.
175			return nil
176		}
177		req := &btpb.ReadRowsRequest{
178			TableName:    t.c.fullTableName(t.table),
179			AppProfileId: t.c.appProfile,
180			Rows:         arg.proto(),
181		}
182		for _, opt := range opts {
183			opt.set(req)
184		}
185		ctx, cancel := context.WithCancel(ctx) // for aborting the stream
186		defer cancel()
187
188		startTime := time.Now()
189		stream, err := t.c.client.ReadRows(ctx, req)
190		if err != nil {
191			return err
192		}
193		cr := newChunkReader()
194		for {
195			res, err := stream.Recv()
196			if err == io.EOF {
197				break
198			}
199			if err != nil {
200				// Reset arg for next Invoke call.
201				arg = arg.retainRowsAfter(prevRowKey)
202				attrMap["rowKey"] = prevRowKey
203				attrMap["error"] = err.Error()
204				attrMap["time_secs"] = time.Since(startTime).Seconds()
205				trace.TracePrintf(ctx, attrMap, "Retry details in ReadRows")
206				return err
207			}
208			attrMap["time_secs"] = time.Since(startTime).Seconds()
209			attrMap["rowCount"] = len(res.Chunks)
210			trace.TracePrintf(ctx, attrMap, "Details in ReadRows")
211
212			for _, cc := range res.Chunks {
213				row, err := cr.Process(cc)
214				if err != nil {
215					// No need to prepare for a retry, this is an unretryable error.
216					return err
217				}
218				if row == nil {
219					continue
220				}
221				prevRowKey = row.Key()
222				if !f(row) {
223					// Cancel and drain stream.
224					cancel()
225					for {
226						if _, err := stream.Recv(); err != nil {
227							// The stream has ended. We don't return an error
228							// because the caller has intentionally interrupted the scan.
229							return nil
230						}
231					}
232				}
233			}
234			if err := cr.Close(); err != nil {
235				// No need to prepare for a retry, this is an unretryable error.
236				return err
237			}
238		}
239		return err
240	}, retryOptions...)
241
242	return err
243}
244
245// ReadRow is a convenience implementation of a single-row reader.
246// A missing row will return a zero-length map and a nil error.
247func (t *Table) ReadRow(ctx context.Context, row string, opts ...ReadOption) (Row, error) {
248	var r Row
249	err := t.ReadRows(ctx, SingleRow(row), func(rr Row) bool {
250		r = rr
251		return true
252	}, opts...)
253	return r, err
254}
255
256// decodeFamilyProto adds the cell data from f to the given row.
257func decodeFamilyProto(r Row, row string, f *btpb.Family) {
258	fam := f.Name // does not have colon
259	for _, col := range f.Columns {
260		for _, cell := range col.Cells {
261			ri := ReadItem{
262				Row:       row,
263				Column:    fam + ":" + string(col.Qualifier),
264				Timestamp: Timestamp(cell.TimestampMicros),
265				Value:     cell.Value,
266			}
267			r[fam] = append(r[fam], ri)
268		}
269	}
270}
271
272// RowSet is a set of rows to be read. It is satisfied by RowList, RowRange and RowRangeList.
273// The serialized size of the RowSet must be no larger than 1MiB.
274type RowSet interface {
275	proto() *btpb.RowSet
276
277	// retainRowsAfter returns a new RowSet that does not include the
278	// given row key or any row key lexicographically less than it.
279	retainRowsAfter(lastRowKey string) RowSet
280
281	// Valid reports whether this set can cover at least one row.
282	valid() bool
283}
284
285// RowList is a sequence of row keys.
286type RowList []string
287
288func (r RowList) proto() *btpb.RowSet {
289	keys := make([][]byte, len(r))
290	for i, row := range r {
291		keys[i] = []byte(row)
292	}
293	return &btpb.RowSet{RowKeys: keys}
294}
295
296func (r RowList) retainRowsAfter(lastRowKey string) RowSet {
297	var retryKeys RowList
298	for _, key := range r {
299		if key > lastRowKey {
300			retryKeys = append(retryKeys, key)
301		}
302	}
303	return retryKeys
304}
305
306func (r RowList) valid() bool {
307	return len(r) > 0
308}
309
310// A RowRange is a half-open interval [Start, Limit) encompassing
311// all the rows with keys at least as large as Start, and less than Limit.
312// (Bigtable string comparison is the same as Go's.)
313// A RowRange can be unbounded, encompassing all keys at least as large as Start.
314type RowRange struct {
315	start string
316	limit string
317}
318
319// NewRange returns the new RowRange [begin, end).
320func NewRange(begin, end string) RowRange {
321	return RowRange{
322		start: begin,
323		limit: end,
324	}
325}
326
327// Unbounded tests whether a RowRange is unbounded.
328func (r RowRange) Unbounded() bool {
329	return r.limit == ""
330}
331
332// Contains says whether the RowRange contains the key.
333func (r RowRange) Contains(row string) bool {
334	return r.start <= row && (r.limit == "" || r.limit > row)
335}
336
337// String provides a printable description of a RowRange.
338func (r RowRange) String() string {
339	a := strconv.Quote(r.start)
340	if r.Unbounded() {
341		return fmt.Sprintf("[%s,∞)", a)
342	}
343	return fmt.Sprintf("[%s,%q)", a, r.limit)
344}
345
346func (r RowRange) proto() *btpb.RowSet {
347	rr := &btpb.RowRange{
348		StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)},
349	}
350	if !r.Unbounded() {
351		rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)}
352	}
353	return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}}
354}
355
356func (r RowRange) retainRowsAfter(lastRowKey string) RowSet {
357	if lastRowKey == "" || lastRowKey < r.start {
358		return r
359	}
360	// Set the beginning of the range to the row after the last scanned.
361	start := lastRowKey + "\x00"
362	if r.Unbounded() {
363		return InfiniteRange(start)
364	}
365	return NewRange(start, r.limit)
366}
367
368func (r RowRange) valid() bool {
369	return r.Unbounded() || r.start < r.limit
370}
371
372// RowRangeList is a sequence of RowRanges representing the union of the ranges.
373type RowRangeList []RowRange
374
375func (r RowRangeList) proto() *btpb.RowSet {
376	ranges := make([]*btpb.RowRange, len(r))
377	for i, rr := range r {
378		// RowRange.proto() returns a RowSet with a single element RowRange array
379		ranges[i] = rr.proto().RowRanges[0]
380	}
381	return &btpb.RowSet{RowRanges: ranges}
382}
383
384func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet {
385	if lastRowKey == "" {
386		return r
387	}
388	// Return a list of any range that has not yet been completely processed
389	var ranges RowRangeList
390	for _, rr := range r {
391		retained := rr.retainRowsAfter(lastRowKey)
392		if retained.valid() {
393			ranges = append(ranges, retained.(RowRange))
394		}
395	}
396	return ranges
397}
398
399func (r RowRangeList) valid() bool {
400	for _, rr := range r {
401		if rr.valid() {
402			return true
403		}
404	}
405	return false
406}
407
408// SingleRow returns a RowSet for reading a single row.
409func SingleRow(row string) RowSet {
410	return RowList{row}
411}
412
413// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
414func PrefixRange(prefix string) RowRange {
415	return RowRange{
416		start: prefix,
417		limit: prefixSuccessor(prefix),
418	}
419}
420
421// InfiniteRange returns the RowRange consisting of all keys at least as
422// large as start.
423func InfiniteRange(start string) RowRange {
424	return RowRange{
425		start: start,
426		limit: "",
427	}
428}
429
430// prefixSuccessor returns the lexically smallest string greater than the
431// prefix, if it exists, or "" otherwise.  In either case, it is the string
432// needed for the Limit of a RowRange.
433func prefixSuccessor(prefix string) string {
434	if prefix == "" {
435		return "" // infinite range
436	}
437	n := len(prefix)
438	for n--; n >= 0 && prefix[n] == '\xff'; n-- {
439	}
440	if n == -1 {
441		return ""
442	}
443	ans := []byte(prefix[:n])
444	ans = append(ans, prefix[n]+1)
445	return string(ans)
446}
447
448// A ReadOption is an optional argument to ReadRows.
449type ReadOption interface {
450	set(req *btpb.ReadRowsRequest)
451}
452
453// RowFilter returns a ReadOption that applies f to the contents of read rows.
454//
455// If multiple RowFilters are provided, only the last is used. To combine filters,
456// use ChainFilters or InterleaveFilters instead.
457func RowFilter(f Filter) ReadOption { return rowFilter{f} }
458
459type rowFilter struct{ f Filter }
460
461func (rf rowFilter) set(req *btpb.ReadRowsRequest) { req.Filter = rf.f.proto() }
462
463// LimitRows returns a ReadOption that will limit the number of rows to be read.
464func LimitRows(limit int64) ReadOption { return limitRows{limit} }
465
466type limitRows struct{ limit int64 }
467
468func (lr limitRows) set(req *btpb.ReadRowsRequest) { req.RowsLimit = lr.limit }
469
470// mutationsAreRetryable returns true if all mutations are idempotent
471// and therefore retryable. A mutation is idempotent iff all cell timestamps
472// have an explicit timestamp set and do not rely on the timestamp being set on the server.
473func mutationsAreRetryable(muts []*btpb.Mutation) bool {
474	serverTime := int64(ServerTime)
475	for _, mut := range muts {
476		setCell := mut.GetSetCell()
477		if setCell != nil && setCell.TimestampMicros == serverTime {
478			return false
479		}
480	}
481	return true
482}
483
484const maxMutations = 100000
485
486// Apply mutates a row atomically. A mutation must contain at least one
487// operation and at most 100000 operations.
488func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) (err error) {
489	ctx = mergeOutgoingMetadata(ctx, t.md)
490	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply")
491	defer func() { trace.EndSpan(ctx, err) }()
492
493	after := func(res proto.Message) {
494		for _, o := range opts {
495			o.after(res)
496		}
497	}
498
499	var callOptions []gax.CallOption
500	if m.cond == nil {
501		req := &btpb.MutateRowRequest{
502			TableName:    t.c.fullTableName(t.table),
503			AppProfileId: t.c.appProfile,
504			RowKey:       []byte(row),
505			Mutations:    m.ops,
506		}
507		if mutationsAreRetryable(m.ops) {
508			callOptions = retryOptions
509		}
510		var res *btpb.MutateRowResponse
511		err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
512			var err error
513			res, err = t.c.client.MutateRow(ctx, req)
514			return err
515		}, callOptions...)
516		if err == nil {
517			after(res)
518		}
519		return err
520	}
521
522	req := &btpb.CheckAndMutateRowRequest{
523		TableName:       t.c.fullTableName(t.table),
524		AppProfileId:    t.c.appProfile,
525		RowKey:          []byte(row),
526		PredicateFilter: m.cond.proto(),
527	}
528	if m.mtrue != nil {
529		if m.mtrue.cond != nil {
530			return errors.New("bigtable: conditional mutations cannot be nested")
531		}
532		req.TrueMutations = m.mtrue.ops
533	}
534	if m.mfalse != nil {
535		if m.mfalse.cond != nil {
536			return errors.New("bigtable: conditional mutations cannot be nested")
537		}
538		req.FalseMutations = m.mfalse.ops
539	}
540	if mutationsAreRetryable(req.TrueMutations) && mutationsAreRetryable(req.FalseMutations) {
541		callOptions = retryOptions
542	}
543	var cmRes *btpb.CheckAndMutateRowResponse
544	err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
545		var err error
546		cmRes, err = t.c.client.CheckAndMutateRow(ctx, req)
547		return err
548	}, callOptions...)
549	if err == nil {
550		after(cmRes)
551	}
552	return err
553}
554
555// An ApplyOption is an optional argument to Apply.
556type ApplyOption interface {
557	after(res proto.Message)
558}
559
560type applyAfterFunc func(res proto.Message)
561
562func (a applyAfterFunc) after(res proto.Message) { a(res) }
563
564// GetCondMutationResult returns an ApplyOption that reports whether the conditional
565// mutation's condition matched.
566func GetCondMutationResult(matched *bool) ApplyOption {
567	return applyAfterFunc(func(res proto.Message) {
568		if res, ok := res.(*btpb.CheckAndMutateRowResponse); ok {
569			*matched = res.PredicateMatched
570		}
571	})
572}
573
574// Mutation represents a set of changes for a single row of a table.
575type Mutation struct {
576	ops []*btpb.Mutation
577
578	// for conditional mutations
579	cond          Filter
580	mtrue, mfalse *Mutation
581}
582
583// NewMutation returns a new mutation.
584func NewMutation() *Mutation {
585	return new(Mutation)
586}
587
588// NewCondMutation returns a conditional mutation.
589// The given row filter determines which mutation is applied:
590// If the filter matches any cell in the row, mtrue is applied;
591// otherwise, mfalse is applied.
592// Either given mutation may be nil.
593//
594// The application of a ReadModifyWrite is atomic; concurrent ReadModifyWrites will
595// be executed serially by the server.
596func NewCondMutation(cond Filter, mtrue, mfalse *Mutation) *Mutation {
597	return &Mutation{cond: cond, mtrue: mtrue, mfalse: mfalse}
598}
599
600// Set sets a value in a specified column, with the given timestamp.
601// The timestamp will be truncated to millisecond granularity.
602// A timestamp of ServerTime means to use the server timestamp.
603func (m *Mutation) Set(family, column string, ts Timestamp, value []byte) {
604	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{
605		FamilyName:      family,
606		ColumnQualifier: []byte(column),
607		TimestampMicros: int64(ts.TruncateToMilliseconds()),
608		Value:           value,
609	}}})
610}
611
612// DeleteCellsInColumn will delete all the cells whose columns are family:column.
613func (m *Mutation) DeleteCellsInColumn(family, column string) {
614	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{
615		FamilyName:      family,
616		ColumnQualifier: []byte(column),
617	}}})
618}
619
620// DeleteTimestampRange deletes all cells whose columns are family:column
621// and whose timestamps are in the half-open interval [start, end).
622// If end is zero, it will be interpreted as infinity.
623// The timestamps will be truncated to millisecond granularity.
624func (m *Mutation) DeleteTimestampRange(family, column string, start, end Timestamp) {
625	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{
626		FamilyName:      family,
627		ColumnQualifier: []byte(column),
628		TimeRange: &btpb.TimestampRange{
629			StartTimestampMicros: int64(start.TruncateToMilliseconds()),
630			EndTimestampMicros:   int64(end.TruncateToMilliseconds()),
631		},
632	}}})
633}
634
635// DeleteCellsInFamily will delete all the cells whose columns are family:*.
636func (m *Mutation) DeleteCellsInFamily(family string) {
637	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromFamily_{DeleteFromFamily: &btpb.Mutation_DeleteFromFamily{
638		FamilyName: family,
639	}}})
640}
641
642// DeleteRow deletes the entire row.
643func (m *Mutation) DeleteRow() {
644	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromRow_{DeleteFromRow: &btpb.Mutation_DeleteFromRow{}}})
645}
646
647// entryErr is a container that combines an entry with the error that was returned for it.
648// Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed.
649type entryErr struct {
650	Entry *btpb.MutateRowsRequest_Entry
651	Err   error
652}
653
654// ApplyBulk applies multiple Mutations, up to a maximum of 100,000.
655// Each mutation is individually applied atomically,
656// but the set of mutations may be applied in any order.
657//
658// Two types of failures may occur. If the entire process
659// fails, (nil, err) will be returned. If specific mutations
660// fail to apply, ([]err, nil) will be returned, and the errors
661// will correspond to the relevant rowKeys/muts arguments.
662//
663// Conditional mutations cannot be applied in bulk and providing one will result in an error.
664func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) {
665	ctx = mergeOutgoingMetadata(ctx, t.md)
666	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/ApplyBulk")
667	defer func() { trace.EndSpan(ctx, err) }()
668
669	if len(rowKeys) != len(muts) {
670		return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts))
671	}
672
673	origEntries := make([]*entryErr, len(rowKeys))
674	for i, key := range rowKeys {
675		mut := muts[i]
676		if mut.cond != nil {
677			return nil, errors.New("conditional mutations cannot be applied in bulk")
678		}
679		origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}}
680	}
681
682	for _, group := range groupEntries(origEntries, maxMutations) {
683		attrMap := make(map[string]interface{})
684		err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
685			attrMap["rowCount"] = len(group)
686			trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk")
687			err := t.doApplyBulk(ctx, group, opts...)
688			if err != nil {
689				// We want to retry the entire request with the current group
690				return err
691			}
692			group = t.getApplyBulkRetries(group)
693			if len(group) > 0 && len(idempotentRetryCodes) > 0 {
694				// We have at least one mutation that needs to be retried.
695				// Return an arbitrary error that is retryable according to callOptions.
696				return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk")
697			}
698			return nil
699		}, retryOptions...)
700		if err != nil {
701			return nil, err
702		}
703	}
704
705	// All the errors are accumulated into an array and returned, interspersed with nils for successful
706	// entries. The absence of any errors means we should return nil.
707	var foundErr bool
708	for _, entry := range origEntries {
709		if entry.Err != nil {
710			foundErr = true
711		}
712		errs = append(errs, entry.Err)
713	}
714	if foundErr {
715		return errs, nil
716	}
717	return nil, nil
718}
719
720// getApplyBulkRetries returns the entries that need to be retried
721func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr {
722	var retryEntries []*entryErr
723	for _, entry := range entries {
724		err := entry.Err
725		if err != nil && isIdempotentRetryCode[status.Code(err)] && mutationsAreRetryable(entry.Entry.Mutations) {
726			// There was an error and the entry is retryable.
727			retryEntries = append(retryEntries, entry)
728		}
729	}
730	return retryEntries
731}
732
733// doApplyBulk does the work of a single ApplyBulk invocation
734func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...ApplyOption) error {
735	after := func(res proto.Message) {
736		for _, o := range opts {
737			o.after(res)
738		}
739	}
740
741	entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs))
742	for i, entryErr := range entryErrs {
743		entries[i] = entryErr.Entry
744	}
745	req := &btpb.MutateRowsRequest{
746		TableName:    t.c.fullTableName(t.table),
747		AppProfileId: t.c.appProfile,
748		Entries:      entries,
749	}
750	stream, err := t.c.client.MutateRows(ctx, req)
751	if err != nil {
752		return err
753	}
754	for {
755		res, err := stream.Recv()
756		if err == io.EOF {
757			break
758		}
759		if err != nil {
760			return err
761		}
762
763		for i, entry := range res.Entries {
764			s := entry.Status
765			if s.Code == int32(codes.OK) {
766				entryErrs[i].Err = nil
767			} else {
768				entryErrs[i].Err = status.Errorf(codes.Code(s.Code), s.Message)
769			}
770		}
771		after(res)
772	}
773	return nil
774}
775
776// groupEntries groups entries into groups of a specified size without breaking up
777// individual entries.
778func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr {
779	var (
780		res   [][]*entryErr
781		start int
782		gmuts int
783	)
784	addGroup := func(end int) {
785		if end-start > 0 {
786			res = append(res, entries[start:end])
787			start = end
788			gmuts = 0
789		}
790	}
791	for i, e := range entries {
792		emuts := len(e.Entry.Mutations)
793		if gmuts+emuts > maxSize {
794			addGroup(i)
795		}
796		gmuts += emuts
797	}
798	addGroup(len(entries))
799	return res
800}
801
802// Timestamp is in units of microseconds since 1 January 1970.
803type Timestamp int64
804
805// ServerTime is a specific Timestamp that may be passed to (*Mutation).Set.
806// It indicates that the server's timestamp should be used.
807const ServerTime Timestamp = -1
808
809// Time converts a time.Time into a Timestamp.
810func Time(t time.Time) Timestamp { return Timestamp(t.UnixNano() / 1e3) }
811
812// Now returns the Timestamp representation of the current time on the client.
813func Now() Timestamp { return Time(time.Now()) }
814
815// Time converts a Timestamp into a time.Time.
816func (ts Timestamp) Time() time.Time { return time.Unix(int64(ts)/1e6, int64(ts)%1e6*1e3) }
817
818// TruncateToMilliseconds truncates a Timestamp to millisecond granularity,
819// which is currently the only granularity supported.
820func (ts Timestamp) TruncateToMilliseconds() Timestamp {
821	if ts == ServerTime {
822		return ts
823	}
824	return ts - ts%1000
825}
826
827// ApplyReadModifyWrite applies a ReadModifyWrite to a specific row.
828// It returns the newly written cells.
829func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
830	ctx = mergeOutgoingMetadata(ctx, t.md)
831	req := &btpb.ReadModifyWriteRowRequest{
832		TableName:    t.c.fullTableName(t.table),
833		AppProfileId: t.c.appProfile,
834		RowKey:       []byte(row),
835		Rules:        m.ops,
836	}
837	res, err := t.c.client.ReadModifyWriteRow(ctx, req)
838	if err != nil {
839		return nil, err
840	}
841	if res.Row == nil {
842		return nil, errors.New("unable to apply ReadModifyWrite: res.Row=nil")
843	}
844	r := make(Row)
845	for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family
846		decodeFamilyProto(r, row, fam)
847	}
848	return r, nil
849}
850
851// ReadModifyWrite represents a set of operations on a single row of a table.
852// It is like Mutation but for non-idempotent changes.
853// When applied, these operations operate on the latest values of the row's cells,
854// and result in a new value being written to the relevant cell with a timestamp
855// that is max(existing timestamp, current server time).
856//
857// The application of a ReadModifyWrite is atomic; concurrent ReadModifyWrites will
858// be executed serially by the server.
859type ReadModifyWrite struct {
860	ops []*btpb.ReadModifyWriteRule
861}
862
863// NewReadModifyWrite returns a new ReadModifyWrite.
864func NewReadModifyWrite() *ReadModifyWrite { return new(ReadModifyWrite) }
865
866// AppendValue appends a value to a specific cell's value.
867// If the cell is unset, it will be treated as an empty value.
868func (m *ReadModifyWrite) AppendValue(family, column string, v []byte) {
869	m.ops = append(m.ops, &btpb.ReadModifyWriteRule{
870		FamilyName:      family,
871		ColumnQualifier: []byte(column),
872		Rule:            &btpb.ReadModifyWriteRule_AppendValue{AppendValue: v},
873	})
874}
875
876// Increment interprets the value in a specific cell as a 64-bit big-endian signed integer,
877// and adds a value to it. If the cell is unset, it will be treated as zero.
878// If the cell is set and is not an 8-byte value, the entire ApplyReadModifyWrite
879// operation will fail.
880func (m *ReadModifyWrite) Increment(family, column string, delta int64) {
881	m.ops = append(m.ops, &btpb.ReadModifyWriteRule{
882		FamilyName:      family,
883		ColumnQualifier: []byte(column),
884		Rule:            &btpb.ReadModifyWriteRule_IncrementAmount{IncrementAmount: delta},
885	})
886}
887
888// SampleRowKeys returns a sample of row keys in the table. The returned row keys will delimit contiguous sections of
889// the table of approximately equal size, which can be used to break up the data for distributed tasks like mapreduces.
890func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
891	ctx = mergeOutgoingMetadata(ctx, t.md)
892	var sampledRowKeys []string
893	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
894		sampledRowKeys = nil
895		req := &btpb.SampleRowKeysRequest{
896			TableName:    t.c.fullTableName(t.table),
897			AppProfileId: t.c.appProfile,
898		}
899		ctx, cancel := context.WithCancel(ctx) // for aborting the stream
900		defer cancel()
901
902		stream, err := t.c.client.SampleRowKeys(ctx, req)
903		if err != nil {
904			return err
905		}
906		for {
907			res, err := stream.Recv()
908			if err == io.EOF {
909				break
910			}
911			if err != nil {
912				return err
913			}
914
915			key := string(res.RowKey)
916			if key == "" {
917				continue
918			}
919
920			sampledRowKeys = append(sampledRowKeys, key)
921		}
922		return nil
923	}, retryOptions...)
924	return sampledRowKeys, err
925}
926