1/*
2Copyright 2015 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable // import "cloud.google.com/go/bigtable"
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"io"
24	"net/url"
25	"strconv"
26	"time"
27
28	btopt "cloud.google.com/go/bigtable/internal/option"
29	"cloud.google.com/go/internal/trace"
30	"github.com/golang/protobuf/proto"
31	gax "github.com/googleapis/gax-go/v2"
32	"google.golang.org/api/option"
33	"google.golang.org/api/option/internaloption"
34	gtransport "google.golang.org/api/transport/grpc"
35	btpb "google.golang.org/genproto/googleapis/bigtable/v2"
36	"google.golang.org/grpc"
37	"google.golang.org/grpc/codes"
38	"google.golang.org/grpc/metadata"
39	"google.golang.org/grpc/status"
40)
41
42const prodAddr = "bigtable.googleapis.com:443"
43const mtlsProdAddr = "bigtable.mtls.googleapis.com:443"
44
45// Client is a client for reading and writing data to tables in an instance.
46//
47// A Client is safe to use concurrently, except for its Close method.
48type Client struct {
49	connPool          gtransport.ConnPool
50	client            btpb.BigtableClient
51	project, instance string
52	appProfile        string
53}
54
55// ClientConfig has configurations for the client.
56type ClientConfig struct {
57	// The id of the app profile to associate with all data operations sent from this client.
58	// If unspecified, the default app profile for the instance will be used.
59	AppProfile string
60}
61
62// NewClient creates a new Client for a given project and instance.
63// The default ClientConfig will be used.
64func NewClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*Client, error) {
65	return NewClientWithConfig(ctx, project, instance, ClientConfig{}, opts...)
66}
67
68// NewClientWithConfig creates a new client with the given config.
69func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) {
70	o, err := btopt.DefaultClientOptions(prodAddr, mtlsProdAddr, Scope, clientUserAgent)
71	if err != nil {
72		return nil, err
73	}
74	// Add gRPC client interceptors to supply Google client information. No external interceptors are passed.
75	o = append(o, btopt.ClientInterceptorOptions(nil, nil)...)
76
77	// Default to a small connection pool that can be overridden.
78	o = append(o,
79		option.WithGRPCConnectionPool(4),
80		// Set the max size to correspond to server-side limits.
81		option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(1<<28), grpc.MaxCallRecvMsgSize(1<<28))),
82	)
83	// Attempts direct access to spanner service over gRPC to improve throughput,
84	// whether the attempt is allowed is totally controlled by service owner.
85	o = append(o, internaloption.EnableDirectPath(true))
86	o = append(o, opts...)
87	connPool, err := gtransport.DialPool(ctx, o...)
88	if err != nil {
89		return nil, fmt.Errorf("dialing: %v", err)
90	}
91
92	return &Client{
93		connPool:   connPool,
94		client:     btpb.NewBigtableClient(connPool),
95		project:    project,
96		instance:   instance,
97		appProfile: config.AppProfile,
98	}, nil
99}
100
101// Close closes the Client.
102func (c *Client) Close() error {
103	return c.connPool.Close()
104}
105
106var (
107	idempotentRetryCodes  = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted}
108	isIdempotentRetryCode = make(map[codes.Code]bool)
109	retryOptions          = []gax.CallOption{
110		gax.WithRetry(func() gax.Retryer {
111			return gax.OnCodes(idempotentRetryCodes, gax.Backoff{
112				Initial:    100 * time.Millisecond,
113				Max:        2 * time.Second,
114				Multiplier: 1.2,
115			})
116		}),
117	}
118)
119
120func init() {
121	for _, code := range idempotentRetryCodes {
122		isIdempotentRetryCode[code] = true
123	}
124}
125
126func (c *Client) fullTableName(table string) string {
127	return fmt.Sprintf("projects/%s/instances/%s/tables/%s", c.project, c.instance, table)
128}
129
130func (c *Client) requestParamsHeaderValue(table string) string {
131	return fmt.Sprintf("table_name=%s&app_profile=%s", url.QueryEscape(c.fullTableName(table)), url.QueryEscape(c.appProfile))
132}
133
134// mergeOutgoingMetadata returns a context populated by the existing outgoing
135// metadata merged with the provided mds.
136func mergeOutgoingMetadata(ctx context.Context, mds ...metadata.MD) context.Context {
137	ctxMD, _ := metadata.FromOutgoingContext(ctx)
138	// The ordering matters, hence why ctxMD comes first.
139	allMDs := append([]metadata.MD{ctxMD}, mds...)
140	return metadata.NewOutgoingContext(ctx, metadata.Join(allMDs...))
141}
142
143// A Table refers to a table.
144//
145// A Table is safe to use concurrently.
146type Table struct {
147	c     *Client
148	table string
149
150	// Metadata to be sent with each request.
151	md metadata.MD
152}
153
154// Open opens a table.
155func (c *Client) Open(table string) *Table {
156	return &Table{
157		c:     c,
158		table: table,
159		md:    metadata.Pairs(resourcePrefixHeader, c.fullTableName(table), requestParamsHeader, c.requestParamsHeaderValue(table)),
160	}
161}
162
163// TODO(dsymonds): Read method that returns a sequence of ReadItems.
164
165// ReadRows reads rows from a table. f is called for each row.
166// If f returns false, the stream is shut down and ReadRows returns.
167// f owns its argument, and f is called serially in order by row key.
168//
169// By default, the yielded rows will contain all values in all cells.
170// Use RowFilter to limit the cells returned.
171func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) (err error) {
172	ctx = mergeOutgoingMetadata(ctx, t.md)
173	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.ReadRows")
174	defer func() { trace.EndSpan(ctx, err) }()
175
176	var prevRowKey string
177	attrMap := make(map[string]interface{})
178	err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
179		if !arg.valid() {
180			// Empty row set, no need to make an API call.
181			// NOTE: we must return early if arg == RowList{} because reading
182			// an empty RowList from bigtable returns all rows from that table.
183			return nil
184		}
185		req := &btpb.ReadRowsRequest{
186			TableName:    t.c.fullTableName(t.table),
187			AppProfileId: t.c.appProfile,
188			Rows:         arg.proto(),
189		}
190		for _, opt := range opts {
191			opt.set(req)
192		}
193		ctx, cancel := context.WithCancel(ctx) // for aborting the stream
194		defer cancel()
195
196		startTime := time.Now()
197		stream, err := t.c.client.ReadRows(ctx, req)
198		if err != nil {
199			return err
200		}
201		cr := newChunkReader()
202		for {
203			res, err := stream.Recv()
204			if err == io.EOF {
205				break
206			}
207			if err != nil {
208				// Reset arg for next Invoke call.
209				arg = arg.retainRowsAfter(prevRowKey)
210				attrMap["rowKey"] = prevRowKey
211				attrMap["error"] = err.Error()
212				attrMap["time_secs"] = time.Since(startTime).Seconds()
213				trace.TracePrintf(ctx, attrMap, "Retry details in ReadRows")
214				return err
215			}
216			attrMap["time_secs"] = time.Since(startTime).Seconds()
217			attrMap["rowCount"] = len(res.Chunks)
218			trace.TracePrintf(ctx, attrMap, "Details in ReadRows")
219
220			for _, cc := range res.Chunks {
221				row, err := cr.Process(cc)
222				if err != nil {
223					// No need to prepare for a retry, this is an unretryable error.
224					return err
225				}
226				if row == nil {
227					continue
228				}
229				prevRowKey = row.Key()
230				if !f(row) {
231					// Cancel and drain stream.
232					cancel()
233					for {
234						if _, err := stream.Recv(); err != nil {
235							// The stream has ended. We don't return an error
236							// because the caller has intentionally interrupted the scan.
237							return nil
238						}
239					}
240				}
241			}
242			if err := cr.Close(); err != nil {
243				// No need to prepare for a retry, this is an unretryable error.
244				return err
245			}
246		}
247		return err
248	}, retryOptions...)
249
250	return err
251}
252
253// ReadRow is a convenience implementation of a single-row reader.
254// A missing row will return a zero-length map and a nil error.
255func (t *Table) ReadRow(ctx context.Context, row string, opts ...ReadOption) (Row, error) {
256	var r Row
257	err := t.ReadRows(ctx, SingleRow(row), func(rr Row) bool {
258		r = rr
259		return true
260	}, opts...)
261	return r, err
262}
263
264// decodeFamilyProto adds the cell data from f to the given row.
265func decodeFamilyProto(r Row, row string, f *btpb.Family) {
266	fam := f.Name // does not have colon
267	for _, col := range f.Columns {
268		for _, cell := range col.Cells {
269			ri := ReadItem{
270				Row:       row,
271				Column:    fam + ":" + string(col.Qualifier),
272				Timestamp: Timestamp(cell.TimestampMicros),
273				Value:     cell.Value,
274			}
275			r[fam] = append(r[fam], ri)
276		}
277	}
278}
279
280// RowSet is a set of rows to be read. It is satisfied by RowList, RowRange and RowRangeList.
281// The serialized size of the RowSet must be no larger than 1MiB.
282type RowSet interface {
283	proto() *btpb.RowSet
284
285	// retainRowsAfter returns a new RowSet that does not include the
286	// given row key or any row key lexicographically less than it.
287	retainRowsAfter(lastRowKey string) RowSet
288
289	// Valid reports whether this set can cover at least one row.
290	valid() bool
291}
292
293// RowList is a sequence of row keys.
294type RowList []string
295
296func (r RowList) proto() *btpb.RowSet {
297	keys := make([][]byte, len(r))
298	for i, row := range r {
299		keys[i] = []byte(row)
300	}
301	return &btpb.RowSet{RowKeys: keys}
302}
303
304func (r RowList) retainRowsAfter(lastRowKey string) RowSet {
305	var retryKeys RowList
306	for _, key := range r {
307		if key > lastRowKey {
308			retryKeys = append(retryKeys, key)
309		}
310	}
311	return retryKeys
312}
313
314func (r RowList) valid() bool {
315	return len(r) > 0
316}
317
318// A RowRange is a half-open interval [Start, Limit) encompassing
319// all the rows with keys at least as large as Start, and less than Limit.
320// (Bigtable string comparison is the same as Go's.)
321// A RowRange can be unbounded, encompassing all keys at least as large as Start.
322type RowRange struct {
323	start string
324	limit string
325}
326
327// NewRange returns the new RowRange [begin, end).
328func NewRange(begin, end string) RowRange {
329	return RowRange{
330		start: begin,
331		limit: end,
332	}
333}
334
335// Unbounded tests whether a RowRange is unbounded.
336func (r RowRange) Unbounded() bool {
337	return r.limit == ""
338}
339
340// Contains says whether the RowRange contains the key.
341func (r RowRange) Contains(row string) bool {
342	return r.start <= row && (r.limit == "" || r.limit > row)
343}
344
345// String provides a printable description of a RowRange.
346func (r RowRange) String() string {
347	a := strconv.Quote(r.start)
348	if r.Unbounded() {
349		return fmt.Sprintf("[%s,∞)", a)
350	}
351	return fmt.Sprintf("[%s,%q)", a, r.limit)
352}
353
354func (r RowRange) proto() *btpb.RowSet {
355	rr := &btpb.RowRange{
356		StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)},
357	}
358	if !r.Unbounded() {
359		rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)}
360	}
361	return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}}
362}
363
364func (r RowRange) retainRowsAfter(lastRowKey string) RowSet {
365	if lastRowKey == "" || lastRowKey < r.start {
366		return r
367	}
368	// Set the beginning of the range to the row after the last scanned.
369	start := lastRowKey + "\x00"
370	if r.Unbounded() {
371		return InfiniteRange(start)
372	}
373	return NewRange(start, r.limit)
374}
375
376func (r RowRange) valid() bool {
377	return r.Unbounded() || r.start < r.limit
378}
379
380// RowRangeList is a sequence of RowRanges representing the union of the ranges.
381type RowRangeList []RowRange
382
383func (r RowRangeList) proto() *btpb.RowSet {
384	ranges := make([]*btpb.RowRange, len(r))
385	for i, rr := range r {
386		// RowRange.proto() returns a RowSet with a single element RowRange array
387		ranges[i] = rr.proto().RowRanges[0]
388	}
389	return &btpb.RowSet{RowRanges: ranges}
390}
391
392func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet {
393	if lastRowKey == "" {
394		return r
395	}
396	// Return a list of any range that has not yet been completely processed
397	var ranges RowRangeList
398	for _, rr := range r {
399		retained := rr.retainRowsAfter(lastRowKey)
400		if retained.valid() {
401			ranges = append(ranges, retained.(RowRange))
402		}
403	}
404	return ranges
405}
406
407func (r RowRangeList) valid() bool {
408	for _, rr := range r {
409		if rr.valid() {
410			return true
411		}
412	}
413	return false
414}
415
416// SingleRow returns a RowSet for reading a single row.
417func SingleRow(row string) RowSet {
418	return RowList{row}
419}
420
421// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
422func PrefixRange(prefix string) RowRange {
423	return RowRange{
424		start: prefix,
425		limit: prefixSuccessor(prefix),
426	}
427}
428
429// InfiniteRange returns the RowRange consisting of all keys at least as
430// large as start.
431func InfiniteRange(start string) RowRange {
432	return RowRange{
433		start: start,
434		limit: "",
435	}
436}
437
438// prefixSuccessor returns the lexically smallest string greater than the
439// prefix, if it exists, or "" otherwise.  In either case, it is the string
440// needed for the Limit of a RowRange.
441func prefixSuccessor(prefix string) string {
442	if prefix == "" {
443		return "" // infinite range
444	}
445	n := len(prefix)
446	for n--; n >= 0 && prefix[n] == '\xff'; n-- {
447	}
448	if n == -1 {
449		return ""
450	}
451	ans := []byte(prefix[:n])
452	ans = append(ans, prefix[n]+1)
453	return string(ans)
454}
455
456// A ReadOption is an optional argument to ReadRows.
457type ReadOption interface {
458	set(req *btpb.ReadRowsRequest)
459}
460
461// RowFilter returns a ReadOption that applies f to the contents of read rows.
462//
463// If multiple RowFilters are provided, only the last is used. To combine filters,
464// use ChainFilters or InterleaveFilters instead.
465func RowFilter(f Filter) ReadOption { return rowFilter{f} }
466
467type rowFilter struct{ f Filter }
468
469func (rf rowFilter) set(req *btpb.ReadRowsRequest) { req.Filter = rf.f.proto() }
470
471// LimitRows returns a ReadOption that will limit the number of rows to be read.
472func LimitRows(limit int64) ReadOption { return limitRows{limit} }
473
474type limitRows struct{ limit int64 }
475
476func (lr limitRows) set(req *btpb.ReadRowsRequest) { req.RowsLimit = lr.limit }
477
478// mutationsAreRetryable returns true if all mutations are idempotent
479// and therefore retryable. A mutation is idempotent iff all cell timestamps
480// have an explicit timestamp set and do not rely on the timestamp being set on the server.
481func mutationsAreRetryable(muts []*btpb.Mutation) bool {
482	serverTime := int64(ServerTime)
483	for _, mut := range muts {
484		setCell := mut.GetSetCell()
485		if setCell != nil && setCell.TimestampMicros == serverTime {
486			return false
487		}
488	}
489	return true
490}
491
492const maxMutations = 100000
493
494// Apply mutates a row atomically. A mutation must contain at least one
495// operation and at most 100000 operations.
496func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) (err error) {
497	ctx = mergeOutgoingMetadata(ctx, t.md)
498	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply")
499	defer func() { trace.EndSpan(ctx, err) }()
500
501	after := func(res proto.Message) {
502		for _, o := range opts {
503			o.after(res)
504		}
505	}
506
507	var callOptions []gax.CallOption
508	if m.cond == nil {
509		req := &btpb.MutateRowRequest{
510			TableName:    t.c.fullTableName(t.table),
511			AppProfileId: t.c.appProfile,
512			RowKey:       []byte(row),
513			Mutations:    m.ops,
514		}
515		if mutationsAreRetryable(m.ops) {
516			callOptions = retryOptions
517		}
518		var res *btpb.MutateRowResponse
519		err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
520			var err error
521			res, err = t.c.client.MutateRow(ctx, req)
522			return err
523		}, callOptions...)
524		if err == nil {
525			after(res)
526		}
527		return err
528	}
529
530	req := &btpb.CheckAndMutateRowRequest{
531		TableName:       t.c.fullTableName(t.table),
532		AppProfileId:    t.c.appProfile,
533		RowKey:          []byte(row),
534		PredicateFilter: m.cond.proto(),
535	}
536	if m.mtrue != nil {
537		if m.mtrue.cond != nil {
538			return errors.New("bigtable: conditional mutations cannot be nested")
539		}
540		req.TrueMutations = m.mtrue.ops
541	}
542	if m.mfalse != nil {
543		if m.mfalse.cond != nil {
544			return errors.New("bigtable: conditional mutations cannot be nested")
545		}
546		req.FalseMutations = m.mfalse.ops
547	}
548	if mutationsAreRetryable(req.TrueMutations) && mutationsAreRetryable(req.FalseMutations) {
549		callOptions = retryOptions
550	}
551	var cmRes *btpb.CheckAndMutateRowResponse
552	err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
553		var err error
554		cmRes, err = t.c.client.CheckAndMutateRow(ctx, req)
555		return err
556	}, callOptions...)
557	if err == nil {
558		after(cmRes)
559	}
560	return err
561}
562
563// An ApplyOption is an optional argument to Apply.
564type ApplyOption interface {
565	after(res proto.Message)
566}
567
568type applyAfterFunc func(res proto.Message)
569
570func (a applyAfterFunc) after(res proto.Message) { a(res) }
571
572// GetCondMutationResult returns an ApplyOption that reports whether the conditional
573// mutation's condition matched.
574func GetCondMutationResult(matched *bool) ApplyOption {
575	return applyAfterFunc(func(res proto.Message) {
576		if res, ok := res.(*btpb.CheckAndMutateRowResponse); ok {
577			*matched = res.PredicateMatched
578		}
579	})
580}
581
582// Mutation represents a set of changes for a single row of a table.
583type Mutation struct {
584	ops []*btpb.Mutation
585
586	// for conditional mutations
587	cond          Filter
588	mtrue, mfalse *Mutation
589}
590
591// NewMutation returns a new mutation.
592func NewMutation() *Mutation {
593	return new(Mutation)
594}
595
596// NewCondMutation returns a conditional mutation.
597// The given row filter determines which mutation is applied:
598// If the filter matches any cell in the row, mtrue is applied;
599// otherwise, mfalse is applied.
600// Either given mutation may be nil.
601//
602// The application of a ReadModifyWrite is atomic; concurrent ReadModifyWrites will
603// be executed serially by the server.
604func NewCondMutation(cond Filter, mtrue, mfalse *Mutation) *Mutation {
605	return &Mutation{cond: cond, mtrue: mtrue, mfalse: mfalse}
606}
607
608// Set sets a value in a specified column, with the given timestamp.
609// The timestamp will be truncated to millisecond granularity.
610// A timestamp of ServerTime means to use the server timestamp.
611func (m *Mutation) Set(family, column string, ts Timestamp, value []byte) {
612	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{
613		FamilyName:      family,
614		ColumnQualifier: []byte(column),
615		TimestampMicros: int64(ts.TruncateToMilliseconds()),
616		Value:           value,
617	}}})
618}
619
620// DeleteCellsInColumn will delete all the cells whose columns are family:column.
621func (m *Mutation) DeleteCellsInColumn(family, column string) {
622	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{
623		FamilyName:      family,
624		ColumnQualifier: []byte(column),
625	}}})
626}
627
628// DeleteTimestampRange deletes all cells whose columns are family:column
629// and whose timestamps are in the half-open interval [start, end).
630// If end is zero, it will be interpreted as infinity.
631// The timestamps will be truncated to millisecond granularity.
632func (m *Mutation) DeleteTimestampRange(family, column string, start, end Timestamp) {
633	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{
634		FamilyName:      family,
635		ColumnQualifier: []byte(column),
636		TimeRange: &btpb.TimestampRange{
637			StartTimestampMicros: int64(start.TruncateToMilliseconds()),
638			EndTimestampMicros:   int64(end.TruncateToMilliseconds()),
639		},
640	}}})
641}
642
643// DeleteCellsInFamily will delete all the cells whose columns are family:*.
644func (m *Mutation) DeleteCellsInFamily(family string) {
645	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromFamily_{DeleteFromFamily: &btpb.Mutation_DeleteFromFamily{
646		FamilyName: family,
647	}}})
648}
649
650// DeleteRow deletes the entire row.
651func (m *Mutation) DeleteRow() {
652	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromRow_{DeleteFromRow: &btpb.Mutation_DeleteFromRow{}}})
653}
654
655// entryErr is a container that combines an entry with the error that was returned for it.
656// Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed.
657type entryErr struct {
658	Entry *btpb.MutateRowsRequest_Entry
659	Err   error
660}
661
662// ApplyBulk applies multiple Mutations, up to a maximum of 100,000.
663// Each mutation is individually applied atomically,
664// but the set of mutations may be applied in any order.
665//
666// Two types of failures may occur. If the entire process
667// fails, (nil, err) will be returned. If specific mutations
668// fail to apply, ([]err, nil) will be returned, and the errors
669// will correspond to the relevant rowKeys/muts arguments.
670//
671// Conditional mutations cannot be applied in bulk and providing one will result in an error.
672func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) {
673	ctx = mergeOutgoingMetadata(ctx, t.md)
674	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/ApplyBulk")
675	defer func() { trace.EndSpan(ctx, err) }()
676
677	if len(rowKeys) != len(muts) {
678		return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts))
679	}
680
681	origEntries := make([]*entryErr, len(rowKeys))
682	for i, key := range rowKeys {
683		mut := muts[i]
684		if mut.cond != nil {
685			return nil, errors.New("conditional mutations cannot be applied in bulk")
686		}
687		origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}}
688	}
689
690	for _, group := range groupEntries(origEntries, maxMutations) {
691		attrMap := make(map[string]interface{})
692		err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
693			attrMap["rowCount"] = len(group)
694			trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk")
695			err := t.doApplyBulk(ctx, group, opts...)
696			if err != nil {
697				// We want to retry the entire request with the current group
698				return err
699			}
700			group = t.getApplyBulkRetries(group)
701			if len(group) > 0 && len(idempotentRetryCodes) > 0 {
702				// We have at least one mutation that needs to be retried.
703				// Return an arbitrary error that is retryable according to callOptions.
704				return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk")
705			}
706			return nil
707		}, retryOptions...)
708		if err != nil {
709			return nil, err
710		}
711	}
712
713	// All the errors are accumulated into an array and returned, interspersed with nils for successful
714	// entries. The absence of any errors means we should return nil.
715	var foundErr bool
716	for _, entry := range origEntries {
717		if entry.Err != nil {
718			foundErr = true
719		}
720		errs = append(errs, entry.Err)
721	}
722	if foundErr {
723		return errs, nil
724	}
725	return nil, nil
726}
727
728// getApplyBulkRetries returns the entries that need to be retried
729func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr {
730	var retryEntries []*entryErr
731	for _, entry := range entries {
732		err := entry.Err
733		if err != nil && isIdempotentRetryCode[status.Code(err)] && mutationsAreRetryable(entry.Entry.Mutations) {
734			// There was an error and the entry is retryable.
735			retryEntries = append(retryEntries, entry)
736		}
737	}
738	return retryEntries
739}
740
741// doApplyBulk does the work of a single ApplyBulk invocation
742func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...ApplyOption) error {
743	after := func(res proto.Message) {
744		for _, o := range opts {
745			o.after(res)
746		}
747	}
748
749	entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs))
750	for i, entryErr := range entryErrs {
751		entries[i] = entryErr.Entry
752	}
753	req := &btpb.MutateRowsRequest{
754		TableName:    t.c.fullTableName(t.table),
755		AppProfileId: t.c.appProfile,
756		Entries:      entries,
757	}
758	stream, err := t.c.client.MutateRows(ctx, req)
759	if err != nil {
760		return err
761	}
762	for {
763		res, err := stream.Recv()
764		if err == io.EOF {
765			break
766		}
767		if err != nil {
768			return err
769		}
770
771		for i, entry := range res.Entries {
772			s := entry.Status
773			if s.Code == int32(codes.OK) {
774				entryErrs[i].Err = nil
775			} else {
776				entryErrs[i].Err = status.Errorf(codes.Code(s.Code), s.Message)
777			}
778		}
779		after(res)
780	}
781	return nil
782}
783
784// groupEntries groups entries into groups of a specified size without breaking up
785// individual entries.
786func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr {
787	var (
788		res   [][]*entryErr
789		start int
790		gmuts int
791	)
792	addGroup := func(end int) {
793		if end-start > 0 {
794			res = append(res, entries[start:end])
795			start = end
796			gmuts = 0
797		}
798	}
799	for i, e := range entries {
800		emuts := len(e.Entry.Mutations)
801		if gmuts+emuts > maxSize {
802			addGroup(i)
803		}
804		gmuts += emuts
805	}
806	addGroup(len(entries))
807	return res
808}
809
810// Timestamp is in units of microseconds since 1 January 1970.
811type Timestamp int64
812
813// ServerTime is a specific Timestamp that may be passed to (*Mutation).Set.
814// It indicates that the server's timestamp should be used.
815const ServerTime Timestamp = -1
816
817// Time converts a time.Time into a Timestamp.
818func Time(t time.Time) Timestamp { return Timestamp(t.UnixNano() / 1e3) }
819
820// Now returns the Timestamp representation of the current time on the client.
821func Now() Timestamp { return Time(time.Now()) }
822
823// Time converts a Timestamp into a time.Time.
824func (ts Timestamp) Time() time.Time { return time.Unix(int64(ts)/1e6, int64(ts)%1e6*1e3) }
825
826// TruncateToMilliseconds truncates a Timestamp to millisecond granularity,
827// which is currently the only granularity supported.
828func (ts Timestamp) TruncateToMilliseconds() Timestamp {
829	if ts == ServerTime {
830		return ts
831	}
832	return ts - ts%1000
833}
834
835// ApplyReadModifyWrite applies a ReadModifyWrite to a specific row.
836// It returns the newly written cells.
837func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
838	ctx = mergeOutgoingMetadata(ctx, t.md)
839	req := &btpb.ReadModifyWriteRowRequest{
840		TableName:    t.c.fullTableName(t.table),
841		AppProfileId: t.c.appProfile,
842		RowKey:       []byte(row),
843		Rules:        m.ops,
844	}
845	res, err := t.c.client.ReadModifyWriteRow(ctx, req)
846	if err != nil {
847		return nil, err
848	}
849	if res.Row == nil {
850		return nil, errors.New("unable to apply ReadModifyWrite: res.Row=nil")
851	}
852	r := make(Row)
853	for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family
854		decodeFamilyProto(r, row, fam)
855	}
856	return r, nil
857}
858
859// ReadModifyWrite represents a set of operations on a single row of a table.
860// It is like Mutation but for non-idempotent changes.
861// When applied, these operations operate on the latest values of the row's cells,
862// and result in a new value being written to the relevant cell with a timestamp
863// that is max(existing timestamp, current server time).
864//
865// The application of a ReadModifyWrite is atomic; concurrent ReadModifyWrites will
866// be executed serially by the server.
867type ReadModifyWrite struct {
868	ops []*btpb.ReadModifyWriteRule
869}
870
871// NewReadModifyWrite returns a new ReadModifyWrite.
872func NewReadModifyWrite() *ReadModifyWrite { return new(ReadModifyWrite) }
873
874// AppendValue appends a value to a specific cell's value.
875// If the cell is unset, it will be treated as an empty value.
876func (m *ReadModifyWrite) AppendValue(family, column string, v []byte) {
877	m.ops = append(m.ops, &btpb.ReadModifyWriteRule{
878		FamilyName:      family,
879		ColumnQualifier: []byte(column),
880		Rule:            &btpb.ReadModifyWriteRule_AppendValue{AppendValue: v},
881	})
882}
883
884// Increment interprets the value in a specific cell as a 64-bit big-endian signed integer,
885// and adds a value to it. If the cell is unset, it will be treated as zero.
886// If the cell is set and is not an 8-byte value, the entire ApplyReadModifyWrite
887// operation will fail.
888func (m *ReadModifyWrite) Increment(family, column string, delta int64) {
889	m.ops = append(m.ops, &btpb.ReadModifyWriteRule{
890		FamilyName:      family,
891		ColumnQualifier: []byte(column),
892		Rule:            &btpb.ReadModifyWriteRule_IncrementAmount{IncrementAmount: delta},
893	})
894}
895
896// SampleRowKeys returns a sample of row keys in the table. The returned row keys will delimit contiguous sections of
897// the table of approximately equal size, which can be used to break up the data for distributed tasks like mapreduces.
898func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
899	ctx = mergeOutgoingMetadata(ctx, t.md)
900	var sampledRowKeys []string
901	err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error {
902		sampledRowKeys = nil
903		req := &btpb.SampleRowKeysRequest{
904			TableName:    t.c.fullTableName(t.table),
905			AppProfileId: t.c.appProfile,
906		}
907		ctx, cancel := context.WithCancel(ctx) // for aborting the stream
908		defer cancel()
909
910		stream, err := t.c.client.SampleRowKeys(ctx, req)
911		if err != nil {
912			return err
913		}
914		for {
915			res, err := stream.Recv()
916			if err == io.EOF {
917				break
918			}
919			if err != nil {
920				return err
921			}
922
923			key := string(res.RowKey)
924			if key == "" {
925				continue
926			}
927
928			sampledRowKeys = append(sampledRowKeys, key)
929		}
930		return nil
931	}, retryOptions...)
932	return sampledRowKeys, err
933}
934