1/* 2Copyright 2015 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable // import "cloud.google.com/go/bigtable" 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "io" 24 "strconv" 25 "time" 26 27 btopt "cloud.google.com/go/bigtable/internal/option" 28 "cloud.google.com/go/internal/trace" 29 "github.com/golang/protobuf/proto" 30 gax "github.com/googleapis/gax-go/v2" 31 "google.golang.org/api/option" 32 gtransport "google.golang.org/api/transport/grpc" 33 btpb "google.golang.org/genproto/googleapis/bigtable/v2" 34 "google.golang.org/grpc" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/metadata" 37 "google.golang.org/grpc/status" 38) 39 40const prodAddr = "bigtable.googleapis.com:443" 41 42// Client is a client for reading and writing data to tables in an instance. 43// 44// A Client is safe to use concurrently, except for its Close method. 45type Client struct { 46 connPool gtransport.ConnPool 47 client btpb.BigtableClient 48 project, instance string 49 appProfile string 50} 51 52// ClientConfig has configurations for the client. 53type ClientConfig struct { 54 // The id of the app profile to associate with all data operations sent from this client. 55 // If unspecified, the default app profile for the instance will be used. 56 AppProfile string 57} 58 59// NewClient creates a new Client for a given project and instance. 60// The default ClientConfig will be used. 61func NewClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*Client, error) { 62 return NewClientWithConfig(ctx, project, instance, ClientConfig{}, opts...) 63} 64 65// NewClientWithConfig creates a new client with the given config. 66func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) { 67 o, err := btopt.DefaultClientOptions(prodAddr, Scope, clientUserAgent) 68 if err != nil { 69 return nil, err 70 } 71 // Add gRPC client interceptors to supply Google client information. No external interceptors are passed. 72 o = append(o, btopt.ClientInterceptorOptions(nil, nil)...) 73 74 // Default to a small connection pool that can be overridden. 75 o = append(o, 76 option.WithGRPCConnectionPool(4), 77 // Set the max size to correspond to server-side limits. 78 option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(1<<28), grpc.MaxCallRecvMsgSize(1<<28))), 79 // TODO(grpc/grpc-go#1388) using connection pool without WithBlock 80 // can cause RPCs to fail randomly. We can delete this after the issue is fixed. 81 option.WithGRPCDialOption(grpc.WithBlock())) 82 o = append(o, opts...) 83 connPool, err := gtransport.DialPool(ctx, o...) 84 if err != nil { 85 return nil, fmt.Errorf("dialing: %v", err) 86 } 87 88 return &Client{ 89 connPool: connPool, 90 client: btpb.NewBigtableClient(connPool), 91 project: project, 92 instance: instance, 93 appProfile: config.AppProfile, 94 }, nil 95} 96 97// Close closes the Client. 98func (c *Client) Close() error { 99 return c.connPool.Close() 100} 101 102var ( 103 idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted} 104 isIdempotentRetryCode = make(map[codes.Code]bool) 105 retryOptions = []gax.CallOption{ 106 gax.WithRetry(func() gax.Retryer { 107 return gax.OnCodes(idempotentRetryCodes, gax.Backoff{ 108 Initial: 100 * time.Millisecond, 109 Max: 2 * time.Second, 110 Multiplier: 1.2, 111 }) 112 }), 113 } 114) 115 116func init() { 117 for _, code := range idempotentRetryCodes { 118 isIdempotentRetryCode[code] = true 119 } 120} 121 122func (c *Client) fullTableName(table string) string { 123 return fmt.Sprintf("projects/%s/instances/%s/tables/%s", c.project, c.instance, table) 124} 125 126// mergeOutgoingMetadata returns a context populated by the existing outgoing 127// metadata merged with the provided mds. 128func mergeOutgoingMetadata(ctx context.Context, mds ...metadata.MD) context.Context { 129 ctxMD, _ := metadata.FromOutgoingContext(ctx) 130 // The ordering matters, hence why ctxMD comes first. 131 allMDs := append([]metadata.MD{ctxMD}, mds...) 132 return metadata.NewOutgoingContext(ctx, metadata.Join(allMDs...)) 133} 134 135// A Table refers to a table. 136// 137// A Table is safe to use concurrently. 138type Table struct { 139 c *Client 140 table string 141 142 // Metadata to be sent with each request. 143 md metadata.MD 144} 145 146// Open opens a table. 147func (c *Client) Open(table string) *Table { 148 return &Table{ 149 c: c, 150 table: table, 151 md: metadata.Pairs(resourcePrefixHeader, c.fullTableName(table)), 152 } 153} 154 155// TODO(dsymonds): Read method that returns a sequence of ReadItems. 156 157// ReadRows reads rows from a table. f is called for each row. 158// If f returns false, the stream is shut down and ReadRows returns. 159// f owns its argument, and f is called serially in order by row key. 160// 161// By default, the yielded rows will contain all values in all cells. 162// Use RowFilter to limit the cells returned. 163func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) (err error) { 164 ctx = mergeOutgoingMetadata(ctx, t.md) 165 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.ReadRows") 166 defer func() { trace.EndSpan(ctx, err) }() 167 168 var prevRowKey string 169 attrMap := make(map[string]interface{}) 170 err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 171 if !arg.valid() { 172 // Empty row set, no need to make an API call. 173 // NOTE: we must return early if arg == RowList{} because reading 174 // an empty RowList from bigtable returns all rows from that table. 175 return nil 176 } 177 req := &btpb.ReadRowsRequest{ 178 TableName: t.c.fullTableName(t.table), 179 AppProfileId: t.c.appProfile, 180 Rows: arg.proto(), 181 } 182 for _, opt := range opts { 183 opt.set(req) 184 } 185 ctx, cancel := context.WithCancel(ctx) // for aborting the stream 186 defer cancel() 187 188 startTime := time.Now() 189 stream, err := t.c.client.ReadRows(ctx, req) 190 if err != nil { 191 return err 192 } 193 cr := newChunkReader() 194 for { 195 res, err := stream.Recv() 196 if err == io.EOF { 197 break 198 } 199 if err != nil { 200 // Reset arg for next Invoke call. 201 arg = arg.retainRowsAfter(prevRowKey) 202 attrMap["rowKey"] = prevRowKey 203 attrMap["error"] = err.Error() 204 attrMap["time_secs"] = time.Since(startTime).Seconds() 205 trace.TracePrintf(ctx, attrMap, "Retry details in ReadRows") 206 return err 207 } 208 attrMap["time_secs"] = time.Since(startTime).Seconds() 209 attrMap["rowCount"] = len(res.Chunks) 210 trace.TracePrintf(ctx, attrMap, "Details in ReadRows") 211 212 for _, cc := range res.Chunks { 213 row, err := cr.Process(cc) 214 if err != nil { 215 // No need to prepare for a retry, this is an unretryable error. 216 return err 217 } 218 if row == nil { 219 continue 220 } 221 prevRowKey = row.Key() 222 if !f(row) { 223 // Cancel and drain stream. 224 cancel() 225 for { 226 if _, err := stream.Recv(); err != nil { 227 // The stream has ended. We don't return an error 228 // because the caller has intentionally interrupted the scan. 229 return nil 230 } 231 } 232 } 233 } 234 if err := cr.Close(); err != nil { 235 // No need to prepare for a retry, this is an unretryable error. 236 return err 237 } 238 } 239 return err 240 }, retryOptions...) 241 242 return err 243} 244 245// ReadRow is a convenience implementation of a single-row reader. 246// A missing row will return a zero-length map and a nil error. 247func (t *Table) ReadRow(ctx context.Context, row string, opts ...ReadOption) (Row, error) { 248 var r Row 249 err := t.ReadRows(ctx, SingleRow(row), func(rr Row) bool { 250 r = rr 251 return true 252 }, opts...) 253 return r, err 254} 255 256// decodeFamilyProto adds the cell data from f to the given row. 257func decodeFamilyProto(r Row, row string, f *btpb.Family) { 258 fam := f.Name // does not have colon 259 for _, col := range f.Columns { 260 for _, cell := range col.Cells { 261 ri := ReadItem{ 262 Row: row, 263 Column: fam + ":" + string(col.Qualifier), 264 Timestamp: Timestamp(cell.TimestampMicros), 265 Value: cell.Value, 266 } 267 r[fam] = append(r[fam], ri) 268 } 269 } 270} 271 272// RowSet is a set of rows to be read. It is satisfied by RowList, RowRange and RowRangeList. 273// The serialized size of the RowSet must be no larger than 1MiB. 274type RowSet interface { 275 proto() *btpb.RowSet 276 277 // retainRowsAfter returns a new RowSet that does not include the 278 // given row key or any row key lexicographically less than it. 279 retainRowsAfter(lastRowKey string) RowSet 280 281 // Valid reports whether this set can cover at least one row. 282 valid() bool 283} 284 285// RowList is a sequence of row keys. 286type RowList []string 287 288func (r RowList) proto() *btpb.RowSet { 289 keys := make([][]byte, len(r)) 290 for i, row := range r { 291 keys[i] = []byte(row) 292 } 293 return &btpb.RowSet{RowKeys: keys} 294} 295 296func (r RowList) retainRowsAfter(lastRowKey string) RowSet { 297 var retryKeys RowList 298 for _, key := range r { 299 if key > lastRowKey { 300 retryKeys = append(retryKeys, key) 301 } 302 } 303 return retryKeys 304} 305 306func (r RowList) valid() bool { 307 return len(r) > 0 308} 309 310// A RowRange is a half-open interval [Start, Limit) encompassing 311// all the rows with keys at least as large as Start, and less than Limit. 312// (Bigtable string comparison is the same as Go's.) 313// A RowRange can be unbounded, encompassing all keys at least as large as Start. 314type RowRange struct { 315 start string 316 limit string 317} 318 319// NewRange returns the new RowRange [begin, end). 320func NewRange(begin, end string) RowRange { 321 return RowRange{ 322 start: begin, 323 limit: end, 324 } 325} 326 327// Unbounded tests whether a RowRange is unbounded. 328func (r RowRange) Unbounded() bool { 329 return r.limit == "" 330} 331 332// Contains says whether the RowRange contains the key. 333func (r RowRange) Contains(row string) bool { 334 return r.start <= row && (r.limit == "" || r.limit > row) 335} 336 337// String provides a printable description of a RowRange. 338func (r RowRange) String() string { 339 a := strconv.Quote(r.start) 340 if r.Unbounded() { 341 return fmt.Sprintf("[%s,∞)", a) 342 } 343 return fmt.Sprintf("[%s,%q)", a, r.limit) 344} 345 346func (r RowRange) proto() *btpb.RowSet { 347 rr := &btpb.RowRange{ 348 StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)}, 349 } 350 if !r.Unbounded() { 351 rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)} 352 } 353 return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}} 354} 355 356func (r RowRange) retainRowsAfter(lastRowKey string) RowSet { 357 if lastRowKey == "" || lastRowKey < r.start { 358 return r 359 } 360 // Set the beginning of the range to the row after the last scanned. 361 start := lastRowKey + "\x00" 362 if r.Unbounded() { 363 return InfiniteRange(start) 364 } 365 return NewRange(start, r.limit) 366} 367 368func (r RowRange) valid() bool { 369 return r.Unbounded() || r.start < r.limit 370} 371 372// RowRangeList is a sequence of RowRanges representing the union of the ranges. 373type RowRangeList []RowRange 374 375func (r RowRangeList) proto() *btpb.RowSet { 376 ranges := make([]*btpb.RowRange, len(r)) 377 for i, rr := range r { 378 // RowRange.proto() returns a RowSet with a single element RowRange array 379 ranges[i] = rr.proto().RowRanges[0] 380 } 381 return &btpb.RowSet{RowRanges: ranges} 382} 383 384func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet { 385 if lastRowKey == "" { 386 return r 387 } 388 // Return a list of any range that has not yet been completely processed 389 var ranges RowRangeList 390 for _, rr := range r { 391 retained := rr.retainRowsAfter(lastRowKey) 392 if retained.valid() { 393 ranges = append(ranges, retained.(RowRange)) 394 } 395 } 396 return ranges 397} 398 399func (r RowRangeList) valid() bool { 400 for _, rr := range r { 401 if rr.valid() { 402 return true 403 } 404 } 405 return false 406} 407 408// SingleRow returns a RowSet for reading a single row. 409func SingleRow(row string) RowSet { 410 return RowList{row} 411} 412 413// PrefixRange returns a RowRange consisting of all keys starting with the prefix. 414func PrefixRange(prefix string) RowRange { 415 return RowRange{ 416 start: prefix, 417 limit: prefixSuccessor(prefix), 418 } 419} 420 421// InfiniteRange returns the RowRange consisting of all keys at least as 422// large as start. 423func InfiniteRange(start string) RowRange { 424 return RowRange{ 425 start: start, 426 limit: "", 427 } 428} 429 430// prefixSuccessor returns the lexically smallest string greater than the 431// prefix, if it exists, or "" otherwise. In either case, it is the string 432// needed for the Limit of a RowRange. 433func prefixSuccessor(prefix string) string { 434 if prefix == "" { 435 return "" // infinite range 436 } 437 n := len(prefix) 438 for n--; n >= 0 && prefix[n] == '\xff'; n-- { 439 } 440 if n == -1 { 441 return "" 442 } 443 ans := []byte(prefix[:n]) 444 ans = append(ans, prefix[n]+1) 445 return string(ans) 446} 447 448// A ReadOption is an optional argument to ReadRows. 449type ReadOption interface { 450 set(req *btpb.ReadRowsRequest) 451} 452 453// RowFilter returns a ReadOption that applies f to the contents of read rows. 454// 455// If multiple RowFilters are provided, only the last is used. To combine filters, 456// use ChainFilters or InterleaveFilters instead. 457func RowFilter(f Filter) ReadOption { return rowFilter{f} } 458 459type rowFilter struct{ f Filter } 460 461func (rf rowFilter) set(req *btpb.ReadRowsRequest) { req.Filter = rf.f.proto() } 462 463// LimitRows returns a ReadOption that will limit the number of rows to be read. 464func LimitRows(limit int64) ReadOption { return limitRows{limit} } 465 466type limitRows struct{ limit int64 } 467 468func (lr limitRows) set(req *btpb.ReadRowsRequest) { req.RowsLimit = lr.limit } 469 470// mutationsAreRetryable returns true if all mutations are idempotent 471// and therefore retryable. A mutation is idempotent iff all cell timestamps 472// have an explicit timestamp set and do not rely on the timestamp being set on the server. 473func mutationsAreRetryable(muts []*btpb.Mutation) bool { 474 serverTime := int64(ServerTime) 475 for _, mut := range muts { 476 setCell := mut.GetSetCell() 477 if setCell != nil && setCell.TimestampMicros == serverTime { 478 return false 479 } 480 } 481 return true 482} 483 484const maxMutations = 100000 485 486// Apply mutates a row atomically. A mutation must contain at least one 487// operation and at most 100000 operations. 488func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) (err error) { 489 ctx = mergeOutgoingMetadata(ctx, t.md) 490 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply") 491 defer func() { trace.EndSpan(ctx, err) }() 492 493 after := func(res proto.Message) { 494 for _, o := range opts { 495 o.after(res) 496 } 497 } 498 499 var callOptions []gax.CallOption 500 if m.cond == nil { 501 req := &btpb.MutateRowRequest{ 502 TableName: t.c.fullTableName(t.table), 503 AppProfileId: t.c.appProfile, 504 RowKey: []byte(row), 505 Mutations: m.ops, 506 } 507 if mutationsAreRetryable(m.ops) { 508 callOptions = retryOptions 509 } 510 var res *btpb.MutateRowResponse 511 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 512 var err error 513 res, err = t.c.client.MutateRow(ctx, req) 514 return err 515 }, callOptions...) 516 if err == nil { 517 after(res) 518 } 519 return err 520 } 521 522 req := &btpb.CheckAndMutateRowRequest{ 523 TableName: t.c.fullTableName(t.table), 524 AppProfileId: t.c.appProfile, 525 RowKey: []byte(row), 526 PredicateFilter: m.cond.proto(), 527 } 528 if m.mtrue != nil { 529 if m.mtrue.cond != nil { 530 return errors.New("bigtable: conditional mutations cannot be nested") 531 } 532 req.TrueMutations = m.mtrue.ops 533 } 534 if m.mfalse != nil { 535 if m.mfalse.cond != nil { 536 return errors.New("bigtable: conditional mutations cannot be nested") 537 } 538 req.FalseMutations = m.mfalse.ops 539 } 540 if mutationsAreRetryable(req.TrueMutations) && mutationsAreRetryable(req.FalseMutations) { 541 callOptions = retryOptions 542 } 543 var cmRes *btpb.CheckAndMutateRowResponse 544 err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 545 var err error 546 cmRes, err = t.c.client.CheckAndMutateRow(ctx, req) 547 return err 548 }, callOptions...) 549 if err == nil { 550 after(cmRes) 551 } 552 return err 553} 554 555// An ApplyOption is an optional argument to Apply. 556type ApplyOption interface { 557 after(res proto.Message) 558} 559 560type applyAfterFunc func(res proto.Message) 561 562func (a applyAfterFunc) after(res proto.Message) { a(res) } 563 564// GetCondMutationResult returns an ApplyOption that reports whether the conditional 565// mutation's condition matched. 566func GetCondMutationResult(matched *bool) ApplyOption { 567 return applyAfterFunc(func(res proto.Message) { 568 if res, ok := res.(*btpb.CheckAndMutateRowResponse); ok { 569 *matched = res.PredicateMatched 570 } 571 }) 572} 573 574// Mutation represents a set of changes for a single row of a table. 575type Mutation struct { 576 ops []*btpb.Mutation 577 578 // for conditional mutations 579 cond Filter 580 mtrue, mfalse *Mutation 581} 582 583// NewMutation returns a new mutation. 584func NewMutation() *Mutation { 585 return new(Mutation) 586} 587 588// NewCondMutation returns a conditional mutation. 589// The given row filter determines which mutation is applied: 590// If the filter matches any cell in the row, mtrue is applied; 591// otherwise, mfalse is applied. 592// Either given mutation may be nil. 593// 594// The application of a ReadModifyWrite is atomic; concurrent ReadModifyWrites will 595// be executed serially by the server. 596func NewCondMutation(cond Filter, mtrue, mfalse *Mutation) *Mutation { 597 return &Mutation{cond: cond, mtrue: mtrue, mfalse: mfalse} 598} 599 600// Set sets a value in a specified column, with the given timestamp. 601// The timestamp will be truncated to millisecond granularity. 602// A timestamp of ServerTime means to use the server timestamp. 603func (m *Mutation) Set(family, column string, ts Timestamp, value []byte) { 604 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ 605 FamilyName: family, 606 ColumnQualifier: []byte(column), 607 TimestampMicros: int64(ts.TruncateToMilliseconds()), 608 Value: value, 609 }}}) 610} 611 612// DeleteCellsInColumn will delete all the cells whose columns are family:column. 613func (m *Mutation) DeleteCellsInColumn(family, column string) { 614 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{ 615 FamilyName: family, 616 ColumnQualifier: []byte(column), 617 }}}) 618} 619 620// DeleteTimestampRange deletes all cells whose columns are family:column 621// and whose timestamps are in the half-open interval [start, end). 622// If end is zero, it will be interpreted as infinity. 623// The timestamps will be truncated to millisecond granularity. 624func (m *Mutation) DeleteTimestampRange(family, column string, start, end Timestamp) { 625 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{ 626 FamilyName: family, 627 ColumnQualifier: []byte(column), 628 TimeRange: &btpb.TimestampRange{ 629 StartTimestampMicros: int64(start.TruncateToMilliseconds()), 630 EndTimestampMicros: int64(end.TruncateToMilliseconds()), 631 }, 632 }}}) 633} 634 635// DeleteCellsInFamily will delete all the cells whose columns are family:*. 636func (m *Mutation) DeleteCellsInFamily(family string) { 637 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromFamily_{DeleteFromFamily: &btpb.Mutation_DeleteFromFamily{ 638 FamilyName: family, 639 }}}) 640} 641 642// DeleteRow deletes the entire row. 643func (m *Mutation) DeleteRow() { 644 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromRow_{DeleteFromRow: &btpb.Mutation_DeleteFromRow{}}}) 645} 646 647// entryErr is a container that combines an entry with the error that was returned for it. 648// Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed. 649type entryErr struct { 650 Entry *btpb.MutateRowsRequest_Entry 651 Err error 652} 653 654// ApplyBulk applies multiple Mutations, up to a maximum of 100,000. 655// Each mutation is individually applied atomically, 656// but the set of mutations may be applied in any order. 657// 658// Two types of failures may occur. If the entire process 659// fails, (nil, err) will be returned. If specific mutations 660// fail to apply, ([]err, nil) will be returned, and the errors 661// will correspond to the relevant rowKeys/muts arguments. 662// 663// Conditional mutations cannot be applied in bulk and providing one will result in an error. 664func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) { 665 ctx = mergeOutgoingMetadata(ctx, t.md) 666 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/ApplyBulk") 667 defer func() { trace.EndSpan(ctx, err) }() 668 669 if len(rowKeys) != len(muts) { 670 return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts)) 671 } 672 673 origEntries := make([]*entryErr, len(rowKeys)) 674 for i, key := range rowKeys { 675 mut := muts[i] 676 if mut.cond != nil { 677 return nil, errors.New("conditional mutations cannot be applied in bulk") 678 } 679 origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}} 680 } 681 682 for _, group := range groupEntries(origEntries, maxMutations) { 683 attrMap := make(map[string]interface{}) 684 err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 685 attrMap["rowCount"] = len(group) 686 trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk") 687 err := t.doApplyBulk(ctx, group, opts...) 688 if err != nil { 689 // We want to retry the entire request with the current group 690 return err 691 } 692 group = t.getApplyBulkRetries(group) 693 if len(group) > 0 && len(idempotentRetryCodes) > 0 { 694 // We have at least one mutation that needs to be retried. 695 // Return an arbitrary error that is retryable according to callOptions. 696 return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk") 697 } 698 return nil 699 }, retryOptions...) 700 if err != nil { 701 return nil, err 702 } 703 } 704 705 // All the errors are accumulated into an array and returned, interspersed with nils for successful 706 // entries. The absence of any errors means we should return nil. 707 var foundErr bool 708 for _, entry := range origEntries { 709 if entry.Err != nil { 710 foundErr = true 711 } 712 errs = append(errs, entry.Err) 713 } 714 if foundErr { 715 return errs, nil 716 } 717 return nil, nil 718} 719 720// getApplyBulkRetries returns the entries that need to be retried 721func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr { 722 var retryEntries []*entryErr 723 for _, entry := range entries { 724 err := entry.Err 725 if err != nil && isIdempotentRetryCode[status.Code(err)] && mutationsAreRetryable(entry.Entry.Mutations) { 726 // There was an error and the entry is retryable. 727 retryEntries = append(retryEntries, entry) 728 } 729 } 730 return retryEntries 731} 732 733// doApplyBulk does the work of a single ApplyBulk invocation 734func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...ApplyOption) error { 735 after := func(res proto.Message) { 736 for _, o := range opts { 737 o.after(res) 738 } 739 } 740 741 entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs)) 742 for i, entryErr := range entryErrs { 743 entries[i] = entryErr.Entry 744 } 745 req := &btpb.MutateRowsRequest{ 746 TableName: t.c.fullTableName(t.table), 747 AppProfileId: t.c.appProfile, 748 Entries: entries, 749 } 750 stream, err := t.c.client.MutateRows(ctx, req) 751 if err != nil { 752 return err 753 } 754 for { 755 res, err := stream.Recv() 756 if err == io.EOF { 757 break 758 } 759 if err != nil { 760 return err 761 } 762 763 for i, entry := range res.Entries { 764 s := entry.Status 765 if s.Code == int32(codes.OK) { 766 entryErrs[i].Err = nil 767 } else { 768 entryErrs[i].Err = status.Errorf(codes.Code(s.Code), s.Message) 769 } 770 } 771 after(res) 772 } 773 return nil 774} 775 776// groupEntries groups entries into groups of a specified size without breaking up 777// individual entries. 778func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr { 779 var ( 780 res [][]*entryErr 781 start int 782 gmuts int 783 ) 784 addGroup := func(end int) { 785 if end-start > 0 { 786 res = append(res, entries[start:end]) 787 start = end 788 gmuts = 0 789 } 790 } 791 for i, e := range entries { 792 emuts := len(e.Entry.Mutations) 793 if gmuts+emuts > maxSize { 794 addGroup(i) 795 } 796 gmuts += emuts 797 } 798 addGroup(len(entries)) 799 return res 800} 801 802// Timestamp is in units of microseconds since 1 January 1970. 803type Timestamp int64 804 805// ServerTime is a specific Timestamp that may be passed to (*Mutation).Set. 806// It indicates that the server's timestamp should be used. 807const ServerTime Timestamp = -1 808 809// Time converts a time.Time into a Timestamp. 810func Time(t time.Time) Timestamp { return Timestamp(t.UnixNano() / 1e3) } 811 812// Now returns the Timestamp representation of the current time on the client. 813func Now() Timestamp { return Time(time.Now()) } 814 815// Time converts a Timestamp into a time.Time. 816func (ts Timestamp) Time() time.Time { return time.Unix(int64(ts)/1e6, int64(ts)%1e6*1e3) } 817 818// TruncateToMilliseconds truncates a Timestamp to millisecond granularity, 819// which is currently the only granularity supported. 820func (ts Timestamp) TruncateToMilliseconds() Timestamp { 821 if ts == ServerTime { 822 return ts 823 } 824 return ts - ts%1000 825} 826 827// ApplyReadModifyWrite applies a ReadModifyWrite to a specific row. 828// It returns the newly written cells. 829func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) { 830 ctx = mergeOutgoingMetadata(ctx, t.md) 831 req := &btpb.ReadModifyWriteRowRequest{ 832 TableName: t.c.fullTableName(t.table), 833 AppProfileId: t.c.appProfile, 834 RowKey: []byte(row), 835 Rules: m.ops, 836 } 837 res, err := t.c.client.ReadModifyWriteRow(ctx, req) 838 if err != nil { 839 return nil, err 840 } 841 if res.Row == nil { 842 return nil, errors.New("unable to apply ReadModifyWrite: res.Row=nil") 843 } 844 r := make(Row) 845 for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family 846 decodeFamilyProto(r, row, fam) 847 } 848 return r, nil 849} 850 851// ReadModifyWrite represents a set of operations on a single row of a table. 852// It is like Mutation but for non-idempotent changes. 853// When applied, these operations operate on the latest values of the row's cells, 854// and result in a new value being written to the relevant cell with a timestamp 855// that is max(existing timestamp, current server time). 856// 857// The application of a ReadModifyWrite is atomic; concurrent ReadModifyWrites will 858// be executed serially by the server. 859type ReadModifyWrite struct { 860 ops []*btpb.ReadModifyWriteRule 861} 862 863// NewReadModifyWrite returns a new ReadModifyWrite. 864func NewReadModifyWrite() *ReadModifyWrite { return new(ReadModifyWrite) } 865 866// AppendValue appends a value to a specific cell's value. 867// If the cell is unset, it will be treated as an empty value. 868func (m *ReadModifyWrite) AppendValue(family, column string, v []byte) { 869 m.ops = append(m.ops, &btpb.ReadModifyWriteRule{ 870 FamilyName: family, 871 ColumnQualifier: []byte(column), 872 Rule: &btpb.ReadModifyWriteRule_AppendValue{AppendValue: v}, 873 }) 874} 875 876// Increment interprets the value in a specific cell as a 64-bit big-endian signed integer, 877// and adds a value to it. If the cell is unset, it will be treated as zero. 878// If the cell is set and is not an 8-byte value, the entire ApplyReadModifyWrite 879// operation will fail. 880func (m *ReadModifyWrite) Increment(family, column string, delta int64) { 881 m.ops = append(m.ops, &btpb.ReadModifyWriteRule{ 882 FamilyName: family, 883 ColumnQualifier: []byte(column), 884 Rule: &btpb.ReadModifyWriteRule_IncrementAmount{IncrementAmount: delta}, 885 }) 886} 887 888// SampleRowKeys returns a sample of row keys in the table. The returned row keys will delimit contiguous sections of 889// the table of approximately equal size, which can be used to break up the data for distributed tasks like mapreduces. 890func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) { 891 ctx = mergeOutgoingMetadata(ctx, t.md) 892 var sampledRowKeys []string 893 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 894 sampledRowKeys = nil 895 req := &btpb.SampleRowKeysRequest{ 896 TableName: t.c.fullTableName(t.table), 897 AppProfileId: t.c.appProfile, 898 } 899 ctx, cancel := context.WithCancel(ctx) // for aborting the stream 900 defer cancel() 901 902 stream, err := t.c.client.SampleRowKeys(ctx, req) 903 if err != nil { 904 return err 905 } 906 for { 907 res, err := stream.Recv() 908 if err == io.EOF { 909 break 910 } 911 if err != nil { 912 return err 913 } 914 915 key := string(res.RowKey) 916 if key == "" { 917 continue 918 } 919 920 sampledRowKeys = append(sampledRowKeys, key) 921 } 922 return nil 923 }, retryOptions...) 924 return sampledRowKeys, err 925} 926