1/*
2Copyright 2015 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package bigtable // import "cloud.google.com/go/bigtable"
18
19import (
20	"context"
21	"errors"
22	"fmt"
23	"io"
24	"strconv"
25	"time"
26
27	"cloud.google.com/go/bigtable/internal/gax"
28	btopt "cloud.google.com/go/bigtable/internal/option"
29	"cloud.google.com/go/internal/trace"
30	"github.com/golang/protobuf/proto"
31	"google.golang.org/api/option"
32	gtransport "google.golang.org/api/transport/grpc"
33	btpb "google.golang.org/genproto/googleapis/bigtable/v2"
34	"google.golang.org/grpc"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/metadata"
37	"google.golang.org/grpc/status"
38)
39
40const prodAddr = "bigtable.googleapis.com:443"
41
42// Client is a client for reading and writing data to tables in an instance.
43//
44// A Client is safe to use concurrently, except for its Close method.
45type Client struct {
46	conn              *grpc.ClientConn
47	client            btpb.BigtableClient
48	project, instance string
49	appProfile        string
50}
51
52// ClientConfig has configurations for the client.
53type ClientConfig struct {
54	// The id of the app profile to associate with all data operations sent from this client.
55	// If unspecified, the default app profile for the instance will be used.
56	AppProfile string
57}
58
59// NewClient creates a new Client for a given project and instance.
60// The default ClientConfig will be used.
61func NewClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*Client, error) {
62	return NewClientWithConfig(ctx, project, instance, ClientConfig{}, opts...)
63}
64
65// NewClientWithConfig creates a new client with the given config.
66func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) {
67	o, err := btopt.DefaultClientOptions(prodAddr, Scope, clientUserAgent)
68	if err != nil {
69		return nil, err
70	}
71	// Default to a small connection pool that can be overridden.
72	o = append(o,
73		option.WithGRPCConnectionPool(4),
74		// Set the max size to correspond to server-side limits.
75		option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(1<<28), grpc.MaxCallRecvMsgSize(1<<28))),
76		// TODO(grpc/grpc-go#1388) using connection pool without WithBlock
77		// can cause RPCs to fail randomly. We can delete this after the issue is fixed.
78		option.WithGRPCDialOption(grpc.WithBlock()))
79	o = append(o, opts...)
80	conn, err := gtransport.Dial(ctx, o...)
81	if err != nil {
82		return nil, fmt.Errorf("dialing: %v", err)
83	}
84
85	return &Client{
86		conn:       conn,
87		client:     btpb.NewBigtableClient(conn),
88		project:    project,
89		instance:   instance,
90		appProfile: config.AppProfile,
91	}, nil
92}
93
94// Close closes the Client.
95func (c *Client) Close() error {
96	return c.conn.Close()
97}
98
99var (
100	idempotentRetryCodes  = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted}
101	isIdempotentRetryCode = make(map[codes.Code]bool)
102	retryOptions          = []gax.CallOption{
103		gax.WithDelayTimeoutSettings(100*time.Millisecond, 2000*time.Millisecond, 1.2),
104		gax.WithRetryCodes(idempotentRetryCodes),
105	}
106)
107
108func init() {
109	for _, code := range idempotentRetryCodes {
110		isIdempotentRetryCode[code] = true
111	}
112}
113
114func (c *Client) fullTableName(table string) string {
115	return fmt.Sprintf("projects/%s/instances/%s/tables/%s", c.project, c.instance, table)
116}
117
118// A Table refers to a table.
119//
120// A Table is safe to use concurrently.
121type Table struct {
122	c     *Client
123	table string
124
125	// Metadata to be sent with each request.
126	md metadata.MD
127}
128
129// Open opens a table.
130func (c *Client) Open(table string) *Table {
131	return &Table{
132		c:     c,
133		table: table,
134		md:    metadata.Pairs(resourcePrefixHeader, c.fullTableName(table)),
135	}
136}
137
138// TODO(dsymonds): Read method that returns a sequence of ReadItems.
139
140// ReadRows reads rows from a table. f is called for each row.
141// If f returns false, the stream is shut down and ReadRows returns.
142// f owns its argument, and f is called serially in order by row key.
143//
144// By default, the yielded rows will contain all values in all cells.
145// Use RowFilter to limit the cells returned.
146func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) (err error) {
147	ctx = mergeOutgoingMetadata(ctx, t.md)
148	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.ReadRows")
149	defer func() { trace.EndSpan(ctx, err) }()
150
151	var prevRowKey string
152	attrMap := make(map[string]interface{})
153	err = gax.Invoke(ctx, func(ctx context.Context) error {
154		if !arg.valid() {
155			// Empty row set, no need to make an API call.
156			// NOTE: we must return early if arg == RowList{} because reading
157			// an empty RowList from bigtable returns all rows from that table.
158			return nil
159		}
160		req := &btpb.ReadRowsRequest{
161			TableName:    t.c.fullTableName(t.table),
162			AppProfileId: t.c.appProfile,
163			Rows:         arg.proto(),
164		}
165		for _, opt := range opts {
166			opt.set(req)
167		}
168		ctx, cancel := context.WithCancel(ctx) // for aborting the stream
169		defer cancel()
170
171		startTime := time.Now()
172		stream, err := t.c.client.ReadRows(ctx, req)
173		if err != nil {
174			return err
175		}
176		cr := newChunkReader()
177		for {
178			res, err := stream.Recv()
179			if err == io.EOF {
180				break
181			}
182			if err != nil {
183				// Reset arg for next Invoke call.
184				arg = arg.retainRowsAfter(prevRowKey)
185				attrMap["rowKey"] = prevRowKey
186				attrMap["error"] = err.Error()
187				attrMap["time_secs"] = time.Since(startTime).Seconds()
188				trace.TracePrintf(ctx, attrMap, "Retry details in ReadRows")
189				return err
190			}
191			attrMap["time_secs"] = time.Since(startTime).Seconds()
192			attrMap["rowCount"] = len(res.Chunks)
193			trace.TracePrintf(ctx, attrMap, "Details in ReadRows")
194
195			for _, cc := range res.Chunks {
196				row, err := cr.Process(cc)
197				if err != nil {
198					// No need to prepare for a retry, this is an unretryable error.
199					return err
200				}
201				if row == nil {
202					continue
203				}
204				prevRowKey = row.Key()
205				if !f(row) {
206					// Cancel and drain stream.
207					cancel()
208					for {
209						if _, err := stream.Recv(); err != nil {
210							// The stream has ended. We don't return an error
211							// because the caller has intentionally interrupted the scan.
212							return nil
213						}
214					}
215				}
216			}
217			if err := cr.Close(); err != nil {
218				// No need to prepare for a retry, this is an unretryable error.
219				return err
220			}
221		}
222		return err
223	}, retryOptions...)
224
225	return err
226}
227
228// ReadRow is a convenience implementation of a single-row reader.
229// A missing row will return a zero-length map and a nil error.
230func (t *Table) ReadRow(ctx context.Context, row string, opts ...ReadOption) (Row, error) {
231	var r Row
232	err := t.ReadRows(ctx, SingleRow(row), func(rr Row) bool {
233		r = rr
234		return true
235	}, opts...)
236	return r, err
237}
238
239// decodeFamilyProto adds the cell data from f to the given row.
240func decodeFamilyProto(r Row, row string, f *btpb.Family) {
241	fam := f.Name // does not have colon
242	for _, col := range f.Columns {
243		for _, cell := range col.Cells {
244			ri := ReadItem{
245				Row:       row,
246				Column:    fam + ":" + string(col.Qualifier),
247				Timestamp: Timestamp(cell.TimestampMicros),
248				Value:     cell.Value,
249			}
250			r[fam] = append(r[fam], ri)
251		}
252	}
253}
254
255// RowSet is a set of rows to be read. It is satisfied by RowList, RowRange and RowRangeList.
256// The serialized size of the RowSet must be no larger than 1MiB.
257type RowSet interface {
258	proto() *btpb.RowSet
259
260	// retainRowsAfter returns a new RowSet that does not include the
261	// given row key or any row key lexicographically less than it.
262	retainRowsAfter(lastRowKey string) RowSet
263
264	// Valid reports whether this set can cover at least one row.
265	valid() bool
266}
267
268// RowList is a sequence of row keys.
269type RowList []string
270
271func (r RowList) proto() *btpb.RowSet {
272	keys := make([][]byte, len(r))
273	for i, row := range r {
274		keys[i] = []byte(row)
275	}
276	return &btpb.RowSet{RowKeys: keys}
277}
278
279func (r RowList) retainRowsAfter(lastRowKey string) RowSet {
280	var retryKeys RowList
281	for _, key := range r {
282		if key > lastRowKey {
283			retryKeys = append(retryKeys, key)
284		}
285	}
286	return retryKeys
287}
288
289func (r RowList) valid() bool {
290	return len(r) > 0
291}
292
293// A RowRange is a half-open interval [Start, Limit) encompassing
294// all the rows with keys at least as large as Start, and less than Limit.
295// (Bigtable string comparison is the same as Go's.)
296// A RowRange can be unbounded, encompassing all keys at least as large as Start.
297type RowRange struct {
298	start string
299	limit string
300}
301
302// NewRange returns the new RowRange [begin, end).
303func NewRange(begin, end string) RowRange {
304	return RowRange{
305		start: begin,
306		limit: end,
307	}
308}
309
310// Unbounded tests whether a RowRange is unbounded.
311func (r RowRange) Unbounded() bool {
312	return r.limit == ""
313}
314
315// Contains says whether the RowRange contains the key.
316func (r RowRange) Contains(row string) bool {
317	return r.start <= row && (r.limit == "" || r.limit > row)
318}
319
320// String provides a printable description of a RowRange.
321func (r RowRange) String() string {
322	a := strconv.Quote(r.start)
323	if r.Unbounded() {
324		return fmt.Sprintf("[%s,∞)", a)
325	}
326	return fmt.Sprintf("[%s,%q)", a, r.limit)
327}
328
329func (r RowRange) proto() *btpb.RowSet {
330	rr := &btpb.RowRange{
331		StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)},
332	}
333	if !r.Unbounded() {
334		rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)}
335	}
336	return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}}
337}
338
339func (r RowRange) retainRowsAfter(lastRowKey string) RowSet {
340	if lastRowKey == "" || lastRowKey < r.start {
341		return r
342	}
343	// Set the beginning of the range to the row after the last scanned.
344	start := lastRowKey + "\x00"
345	if r.Unbounded() {
346		return InfiniteRange(start)
347	}
348	return NewRange(start, r.limit)
349}
350
351func (r RowRange) valid() bool {
352	return r.Unbounded() || r.start < r.limit
353}
354
355// RowRangeList is a sequence of RowRanges representing the union of the ranges.
356type RowRangeList []RowRange
357
358func (r RowRangeList) proto() *btpb.RowSet {
359	ranges := make([]*btpb.RowRange, len(r))
360	for i, rr := range r {
361		// RowRange.proto() returns a RowSet with a single element RowRange array
362		ranges[i] = rr.proto().RowRanges[0]
363	}
364	return &btpb.RowSet{RowRanges: ranges}
365}
366
367func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet {
368	if lastRowKey == "" {
369		return r
370	}
371	// Return a list of any range that has not yet been completely processed
372	var ranges RowRangeList
373	for _, rr := range r {
374		retained := rr.retainRowsAfter(lastRowKey)
375		if retained.valid() {
376			ranges = append(ranges, retained.(RowRange))
377		}
378	}
379	return ranges
380}
381
382func (r RowRangeList) valid() bool {
383	for _, rr := range r {
384		if rr.valid() {
385			return true
386		}
387	}
388	return false
389}
390
391// SingleRow returns a RowSet for reading a single row.
392func SingleRow(row string) RowSet {
393	return RowList{row}
394}
395
396// PrefixRange returns a RowRange consisting of all keys starting with the prefix.
397func PrefixRange(prefix string) RowRange {
398	return RowRange{
399		start: prefix,
400		limit: prefixSuccessor(prefix),
401	}
402}
403
404// InfiniteRange returns the RowRange consisting of all keys at least as
405// large as start.
406func InfiniteRange(start string) RowRange {
407	return RowRange{
408		start: start,
409		limit: "",
410	}
411}
412
413// prefixSuccessor returns the lexically smallest string greater than the
414// prefix, if it exists, or "" otherwise.  In either case, it is the string
415// needed for the Limit of a RowRange.
416func prefixSuccessor(prefix string) string {
417	if prefix == "" {
418		return "" // infinite range
419	}
420	n := len(prefix)
421	for n--; n >= 0 && prefix[n] == '\xff'; n-- {
422	}
423	if n == -1 {
424		return ""
425	}
426	ans := []byte(prefix[:n])
427	ans = append(ans, prefix[n]+1)
428	return string(ans)
429}
430
431// A ReadOption is an optional argument to ReadRows.
432type ReadOption interface {
433	set(req *btpb.ReadRowsRequest)
434}
435
436// RowFilter returns a ReadOption that applies f to the contents of read rows.
437//
438// If multiple RowFilters are provided, only the last is used. To combine filters,
439// use ChainFilters or InterleaveFilters instead.
440func RowFilter(f Filter) ReadOption { return rowFilter{f} }
441
442type rowFilter struct{ f Filter }
443
444func (rf rowFilter) set(req *btpb.ReadRowsRequest) { req.Filter = rf.f.proto() }
445
446// LimitRows returns a ReadOption that will limit the number of rows to be read.
447func LimitRows(limit int64) ReadOption { return limitRows{limit} }
448
449type limitRows struct{ limit int64 }
450
451func (lr limitRows) set(req *btpb.ReadRowsRequest) { req.RowsLimit = lr.limit }
452
453// mutationsAreRetryable returns true if all mutations are idempotent
454// and therefore retryable. A mutation is idempotent iff all cell timestamps
455// have an explicit timestamp set and do not rely on the timestamp being set on the server.
456func mutationsAreRetryable(muts []*btpb.Mutation) bool {
457	serverTime := int64(ServerTime)
458	for _, mut := range muts {
459		setCell := mut.GetSetCell()
460		if setCell != nil && setCell.TimestampMicros == serverTime {
461			return false
462		}
463	}
464	return true
465}
466
467const maxMutations = 100000
468
469// Apply mutates a row atomically. A mutation must contain at least one
470// operation and at most 100000 operations.
471func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) (err error) {
472	ctx = mergeOutgoingMetadata(ctx, t.md)
473	after := func(res proto.Message) {
474		for _, o := range opts {
475			o.after(res)
476		}
477	}
478
479	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply")
480	defer func() { trace.EndSpan(ctx, err) }()
481	var callOptions []gax.CallOption
482	if m.cond == nil {
483		req := &btpb.MutateRowRequest{
484			TableName:    t.c.fullTableName(t.table),
485			AppProfileId: t.c.appProfile,
486			RowKey:       []byte(row),
487			Mutations:    m.ops,
488		}
489		if mutationsAreRetryable(m.ops) {
490			callOptions = retryOptions
491		}
492		var res *btpb.MutateRowResponse
493		err := gax.Invoke(ctx, func(ctx context.Context) error {
494			var err error
495			res, err = t.c.client.MutateRow(ctx, req)
496			return err
497		}, callOptions...)
498		if err == nil {
499			after(res)
500		}
501		return err
502	}
503
504	req := &btpb.CheckAndMutateRowRequest{
505		TableName:       t.c.fullTableName(t.table),
506		AppProfileId:    t.c.appProfile,
507		RowKey:          []byte(row),
508		PredicateFilter: m.cond.proto(),
509	}
510	if m.mtrue != nil {
511		if m.mtrue.cond != nil {
512			return errors.New("bigtable: conditional mutations cannot be nested")
513		}
514		req.TrueMutations = m.mtrue.ops
515	}
516	if m.mfalse != nil {
517		if m.mfalse.cond != nil {
518			return errors.New("bigtable: conditional mutations cannot be nested")
519		}
520		req.FalseMutations = m.mfalse.ops
521	}
522	if mutationsAreRetryable(req.TrueMutations) && mutationsAreRetryable(req.FalseMutations) {
523		callOptions = retryOptions
524	}
525	var cmRes *btpb.CheckAndMutateRowResponse
526	err = gax.Invoke(ctx, func(ctx context.Context) error {
527		var err error
528		cmRes, err = t.c.client.CheckAndMutateRow(ctx, req)
529		return err
530	}, callOptions...)
531	if err == nil {
532		after(cmRes)
533	}
534	return err
535}
536
537// An ApplyOption is an optional argument to Apply.
538type ApplyOption interface {
539	after(res proto.Message)
540}
541
542type applyAfterFunc func(res proto.Message)
543
544func (a applyAfterFunc) after(res proto.Message) { a(res) }
545
546// GetCondMutationResult returns an ApplyOption that reports whether the conditional
547// mutation's condition matched.
548func GetCondMutationResult(matched *bool) ApplyOption {
549	return applyAfterFunc(func(res proto.Message) {
550		if res, ok := res.(*btpb.CheckAndMutateRowResponse); ok {
551			*matched = res.PredicateMatched
552		}
553	})
554}
555
556// Mutation represents a set of changes for a single row of a table.
557type Mutation struct {
558	ops []*btpb.Mutation
559
560	// for conditional mutations
561	cond          Filter
562	mtrue, mfalse *Mutation
563}
564
565// NewMutation returns a new mutation.
566func NewMutation() *Mutation {
567	return new(Mutation)
568}
569
570// NewCondMutation returns a conditional mutation.
571// The given row filter determines which mutation is applied:
572// If the filter matches any cell in the row, mtrue is applied;
573// otherwise, mfalse is applied.
574// Either given mutation may be nil.
575func NewCondMutation(cond Filter, mtrue, mfalse *Mutation) *Mutation {
576	return &Mutation{cond: cond, mtrue: mtrue, mfalse: mfalse}
577}
578
579// Set sets a value in a specified column, with the given timestamp.
580// The timestamp will be truncated to millisecond granularity.
581// A timestamp of ServerTime means to use the server timestamp.
582func (m *Mutation) Set(family, column string, ts Timestamp, value []byte) {
583	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{
584		FamilyName:      family,
585		ColumnQualifier: []byte(column),
586		TimestampMicros: int64(ts.TruncateToMilliseconds()),
587		Value:           value,
588	}}})
589}
590
591// DeleteCellsInColumn will delete all the cells whose columns are family:column.
592func (m *Mutation) DeleteCellsInColumn(family, column string) {
593	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{
594		FamilyName:      family,
595		ColumnQualifier: []byte(column),
596	}}})
597}
598
599// DeleteTimestampRange deletes all cells whose columns are family:column
600// and whose timestamps are in the half-open interval [start, end).
601// If end is zero, it will be interpreted as infinity.
602// The timestamps will be truncated to millisecond granularity.
603func (m *Mutation) DeleteTimestampRange(family, column string, start, end Timestamp) {
604	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{
605		FamilyName:      family,
606		ColumnQualifier: []byte(column),
607		TimeRange: &btpb.TimestampRange{
608			StartTimestampMicros: int64(start.TruncateToMilliseconds()),
609			EndTimestampMicros:   int64(end.TruncateToMilliseconds()),
610		},
611	}}})
612}
613
614// DeleteCellsInFamily will delete all the cells whose columns are family:*.
615func (m *Mutation) DeleteCellsInFamily(family string) {
616	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromFamily_{DeleteFromFamily: &btpb.Mutation_DeleteFromFamily{
617		FamilyName: family,
618	}}})
619}
620
621// DeleteRow deletes the entire row.
622func (m *Mutation) DeleteRow() {
623	m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromRow_{DeleteFromRow: &btpb.Mutation_DeleteFromRow{}}})
624}
625
626// entryErr is a container that combines an entry with the error that was returned for it.
627// Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed.
628type entryErr struct {
629	Entry *btpb.MutateRowsRequest_Entry
630	Err   error
631}
632
633// ApplyBulk applies multiple Mutations, up to a maximum of 100,000.
634// Each mutation is individually applied atomically,
635// but the set of mutations may be applied in any order.
636//
637// Two types of failures may occur. If the entire process
638// fails, (nil, err) will be returned. If specific mutations
639// fail to apply, ([]err, nil) will be returned, and the errors
640// will correspond to the relevant rowKeys/muts arguments.
641//
642// Conditional mutations cannot be applied in bulk and providing one will result in an error.
643func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) {
644	ctx = mergeOutgoingMetadata(ctx, t.md)
645	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/ApplyBulk")
646	defer func() { trace.EndSpan(ctx, err) }()
647
648	if len(rowKeys) != len(muts) {
649		return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts))
650	}
651
652	origEntries := make([]*entryErr, len(rowKeys))
653	for i, key := range rowKeys {
654		mut := muts[i]
655		if mut.cond != nil {
656			return nil, errors.New("conditional mutations cannot be applied in bulk")
657		}
658		origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}}
659	}
660
661	for _, group := range groupEntries(origEntries, maxMutations) {
662		attrMap := make(map[string]interface{})
663		err = gax.Invoke(ctx, func(ctx context.Context) error {
664			attrMap["rowCount"] = len(group)
665			trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk")
666			err := t.doApplyBulk(ctx, group, opts...)
667			if err != nil {
668				// We want to retry the entire request with the current group
669				return err
670			}
671			group = t.getApplyBulkRetries(group)
672			if len(group) > 0 && len(idempotentRetryCodes) > 0 {
673				// We have at least one mutation that needs to be retried.
674				// Return an arbitrary error that is retryable according to callOptions.
675				return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk")
676			}
677			return nil
678		}, retryOptions...)
679		if err != nil {
680			return nil, err
681		}
682	}
683
684	// All the errors are accumulated into an array and returned, interspersed with nils for successful
685	// entries. The absence of any errors means we should return nil.
686	var foundErr bool
687	for _, entry := range origEntries {
688		if entry.Err != nil {
689			foundErr = true
690		}
691		errs = append(errs, entry.Err)
692	}
693	if foundErr {
694		return errs, nil
695	}
696	return nil, nil
697}
698
699// getApplyBulkRetries returns the entries that need to be retried
700func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr {
701	var retryEntries []*entryErr
702	for _, entry := range entries {
703		err := entry.Err
704		if err != nil && isIdempotentRetryCode[grpc.Code(err)] && mutationsAreRetryable(entry.Entry.Mutations) {
705			// There was an error and the entry is retryable.
706			retryEntries = append(retryEntries, entry)
707		}
708	}
709	return retryEntries
710}
711
712// doApplyBulk does the work of a single ApplyBulk invocation
713func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...ApplyOption) error {
714	after := func(res proto.Message) {
715		for _, o := range opts {
716			o.after(res)
717		}
718	}
719
720	entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs))
721	for i, entryErr := range entryErrs {
722		entries[i] = entryErr.Entry
723	}
724	req := &btpb.MutateRowsRequest{
725		TableName:    t.c.fullTableName(t.table),
726		AppProfileId: t.c.appProfile,
727		Entries:      entries,
728	}
729	stream, err := t.c.client.MutateRows(ctx, req)
730	if err != nil {
731		return err
732	}
733	for {
734		res, err := stream.Recv()
735		if err == io.EOF {
736			break
737		}
738		if err != nil {
739			return err
740		}
741
742		for i, entry := range res.Entries {
743			s := entry.Status
744			if s.Code == int32(codes.OK) {
745				entryErrs[i].Err = nil
746			} else {
747				entryErrs[i].Err = status.Errorf(codes.Code(s.Code), s.Message)
748			}
749		}
750		after(res)
751	}
752	return nil
753}
754
755// groupEntries groups entries into groups of a specified size without breaking up
756// individual entries.
757func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr {
758	var (
759		res   [][]*entryErr
760		start int
761		gmuts int
762	)
763	addGroup := func(end int) {
764		if end-start > 0 {
765			res = append(res, entries[start:end])
766			start = end
767			gmuts = 0
768		}
769	}
770	for i, e := range entries {
771		emuts := len(e.Entry.Mutations)
772		if gmuts+emuts > maxSize {
773			addGroup(i)
774		}
775		gmuts += emuts
776	}
777	addGroup(len(entries))
778	return res
779}
780
781// Timestamp is in units of microseconds since 1 January 1970.
782type Timestamp int64
783
784// ServerTime is a specific Timestamp that may be passed to (*Mutation).Set.
785// It indicates that the server's timestamp should be used.
786const ServerTime Timestamp = -1
787
788// Time converts a time.Time into a Timestamp.
789func Time(t time.Time) Timestamp { return Timestamp(t.UnixNano() / 1e3) }
790
791// Now returns the Timestamp representation of the current time on the client.
792func Now() Timestamp { return Time(time.Now()) }
793
794// Time converts a Timestamp into a time.Time.
795func (ts Timestamp) Time() time.Time { return time.Unix(0, int64(ts)*1e3) }
796
797// TruncateToMilliseconds truncates a Timestamp to millisecond granularity,
798// which is currently the only granularity supported.
799func (ts Timestamp) TruncateToMilliseconds() Timestamp {
800	if ts == ServerTime {
801		return ts
802	}
803	return ts - ts%1000
804}
805
806// ApplyReadModifyWrite applies a ReadModifyWrite to a specific row.
807// It returns the newly written cells.
808func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) {
809	ctx = mergeOutgoingMetadata(ctx, t.md)
810	req := &btpb.ReadModifyWriteRowRequest{
811		TableName:    t.c.fullTableName(t.table),
812		AppProfileId: t.c.appProfile,
813		RowKey:       []byte(row),
814		Rules:        m.ops,
815	}
816	res, err := t.c.client.ReadModifyWriteRow(ctx, req)
817	if err != nil {
818		return nil, err
819	}
820	if res.Row == nil {
821		return nil, errors.New("unable to apply ReadModifyWrite: res.Row=nil")
822	}
823	r := make(Row)
824	for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family
825		decodeFamilyProto(r, row, fam)
826	}
827	return r, nil
828}
829
830// ReadModifyWrite represents a set of operations on a single row of a table.
831// It is like Mutation but for non-idempotent changes.
832// When applied, these operations operate on the latest values of the row's cells,
833// and result in a new value being written to the relevant cell with a timestamp
834// that is max(existing timestamp, current server time).
835//
836// The application of a ReadModifyWrite is atomic; concurrent ReadModifyWrites will
837// be executed serially by the server.
838type ReadModifyWrite struct {
839	ops []*btpb.ReadModifyWriteRule
840}
841
842// NewReadModifyWrite returns a new ReadModifyWrite.
843func NewReadModifyWrite() *ReadModifyWrite { return new(ReadModifyWrite) }
844
845// AppendValue appends a value to a specific cell's value.
846// If the cell is unset, it will be treated as an empty value.
847func (m *ReadModifyWrite) AppendValue(family, column string, v []byte) {
848	m.ops = append(m.ops, &btpb.ReadModifyWriteRule{
849		FamilyName:      family,
850		ColumnQualifier: []byte(column),
851		Rule:            &btpb.ReadModifyWriteRule_AppendValue{AppendValue: v},
852	})
853}
854
855// Increment interprets the value in a specific cell as a 64-bit big-endian signed integer,
856// and adds a value to it. If the cell is unset, it will be treated as zero.
857// If the cell is set and is not an 8-byte value, the entire ApplyReadModifyWrite
858// operation will fail.
859func (m *ReadModifyWrite) Increment(family, column string, delta int64) {
860	m.ops = append(m.ops, &btpb.ReadModifyWriteRule{
861		FamilyName:      family,
862		ColumnQualifier: []byte(column),
863		Rule:            &btpb.ReadModifyWriteRule_IncrementAmount{IncrementAmount: delta},
864	})
865}
866
867// mergeOutgoingMetadata returns a context populated by the existing outgoing metadata,
868// if any, joined with internal metadata.
869func mergeOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context {
870	mdCopy, _ := metadata.FromOutgoingContext(ctx)
871	return metadata.NewOutgoingContext(ctx, metadata.Join(mdCopy, md))
872}
873
874// SampleRowKeys returns a sample of row keys in the table. The returned row keys will delimit contiguous sections of
875// the table of approximately equal size, which can be used to break up the data for distributed tasks like mapreduces.
876func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) {
877	ctx = mergeOutgoingMetadata(ctx, t.md)
878	var sampledRowKeys []string
879	err := gax.Invoke(ctx, func(ctx context.Context) error {
880		sampledRowKeys = nil
881		req := &btpb.SampleRowKeysRequest{
882			TableName:    t.c.fullTableName(t.table),
883			AppProfileId: t.c.appProfile,
884		}
885		ctx, cancel := context.WithCancel(ctx) // for aborting the stream
886		defer cancel()
887
888		stream, err := t.c.client.SampleRowKeys(ctx, req)
889		if err != nil {
890			return err
891		}
892		for {
893			res, err := stream.Recv()
894			if err == io.EOF {
895				break
896			}
897			if err != nil {
898				return err
899			}
900
901			key := string(res.RowKey)
902			if key == "" {
903				continue
904			}
905
906			sampledRowKeys = append(sampledRowKeys, key)
907		}
908		return nil
909	}, retryOptions...)
910	return sampledRowKeys, err
911}
912