1/* 2Copyright 2015 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable // import "cloud.google.com/go/bigtable" 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "io" 24 "net/url" 25 "strconv" 26 "time" 27 28 btopt "cloud.google.com/go/bigtable/internal/option" 29 "cloud.google.com/go/internal/trace" 30 "github.com/golang/protobuf/proto" 31 gax "github.com/googleapis/gax-go/v2" 32 "google.golang.org/api/option" 33 "google.golang.org/api/option/internaloption" 34 gtransport "google.golang.org/api/transport/grpc" 35 btpb "google.golang.org/genproto/googleapis/bigtable/v2" 36 "google.golang.org/grpc" 37 "google.golang.org/grpc/codes" 38 "google.golang.org/grpc/metadata" 39 "google.golang.org/grpc/status" 40) 41 42const prodAddr = "bigtable.googleapis.com:443" 43const mtlsProdAddr = "bigtable.mtls.googleapis.com:443" 44 45// Client is a client for reading and writing data to tables in an instance. 46// 47// A Client is safe to use concurrently, except for its Close method. 48type Client struct { 49 connPool gtransport.ConnPool 50 client btpb.BigtableClient 51 project, instance string 52 appProfile string 53} 54 55// ClientConfig has configurations for the client. 56type ClientConfig struct { 57 // The id of the app profile to associate with all data operations sent from this client. 58 // If unspecified, the default app profile for the instance will be used. 59 AppProfile string 60} 61 62// NewClient creates a new Client for a given project and instance. 63// The default ClientConfig will be used. 64func NewClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*Client, error) { 65 return NewClientWithConfig(ctx, project, instance, ClientConfig{}, opts...) 66} 67 68// NewClientWithConfig creates a new client with the given config. 69func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) { 70 o, err := btopt.DefaultClientOptions(prodAddr, mtlsProdAddr, Scope, clientUserAgent) 71 if err != nil { 72 return nil, err 73 } 74 // Add gRPC client interceptors to supply Google client information. No external interceptors are passed. 75 o = append(o, btopt.ClientInterceptorOptions(nil, nil)...) 76 77 // Default to a small connection pool that can be overridden. 78 o = append(o, 79 option.WithGRPCConnectionPool(4), 80 // Set the max size to correspond to server-side limits. 81 option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(1<<28), grpc.MaxCallRecvMsgSize(1<<28))), 82 ) 83 // Attempts direct access to spanner service over gRPC to improve throughput, 84 // whether the attempt is allowed is totally controlled by service owner. 85 o = append(o, internaloption.EnableDirectPath(true)) 86 o = append(o, opts...) 87 connPool, err := gtransport.DialPool(ctx, o...) 88 if err != nil { 89 return nil, fmt.Errorf("dialing: %v", err) 90 } 91 92 return &Client{ 93 connPool: connPool, 94 client: btpb.NewBigtableClient(connPool), 95 project: project, 96 instance: instance, 97 appProfile: config.AppProfile, 98 }, nil 99} 100 101// Close closes the Client. 102func (c *Client) Close() error { 103 return c.connPool.Close() 104} 105 106var ( 107 idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted} 108 isIdempotentRetryCode = make(map[codes.Code]bool) 109 retryOptions = []gax.CallOption{ 110 gax.WithRetry(func() gax.Retryer { 111 return gax.OnCodes(idempotentRetryCodes, gax.Backoff{ 112 Initial: 100 * time.Millisecond, 113 Max: 2 * time.Second, 114 Multiplier: 1.2, 115 }) 116 }), 117 } 118) 119 120func init() { 121 for _, code := range idempotentRetryCodes { 122 isIdempotentRetryCode[code] = true 123 } 124} 125 126func (c *Client) fullTableName(table string) string { 127 return fmt.Sprintf("projects/%s/instances/%s/tables/%s", c.project, c.instance, table) 128} 129 130func (c *Client) requestParamsHeaderValue(table string) string { 131 return fmt.Sprintf("table_name=%s&app_profile=%s", url.QueryEscape(c.fullTableName(table)), url.QueryEscape(c.appProfile)) 132} 133 134// mergeOutgoingMetadata returns a context populated by the existing outgoing 135// metadata merged with the provided mds. 136func mergeOutgoingMetadata(ctx context.Context, mds ...metadata.MD) context.Context { 137 ctxMD, _ := metadata.FromOutgoingContext(ctx) 138 // The ordering matters, hence why ctxMD comes first. 139 allMDs := append([]metadata.MD{ctxMD}, mds...) 140 return metadata.NewOutgoingContext(ctx, metadata.Join(allMDs...)) 141} 142 143// A Table refers to a table. 144// 145// A Table is safe to use concurrently. 146type Table struct { 147 c *Client 148 table string 149 150 // Metadata to be sent with each request. 151 md metadata.MD 152} 153 154// Open opens a table. 155func (c *Client) Open(table string) *Table { 156 return &Table{ 157 c: c, 158 table: table, 159 md: metadata.Pairs(resourcePrefixHeader, c.fullTableName(table), requestParamsHeader, c.requestParamsHeaderValue(table)), 160 } 161} 162 163// TODO(dsymonds): Read method that returns a sequence of ReadItems. 164 165// ReadRows reads rows from a table. f is called for each row. 166// If f returns false, the stream is shut down and ReadRows returns. 167// f owns its argument, and f is called serially in order by row key. 168// 169// By default, the yielded rows will contain all values in all cells. 170// Use RowFilter to limit the cells returned. 171func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) (err error) { 172 ctx = mergeOutgoingMetadata(ctx, t.md) 173 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.ReadRows") 174 defer func() { trace.EndSpan(ctx, err) }() 175 176 var prevRowKey string 177 attrMap := make(map[string]interface{}) 178 err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 179 if !arg.valid() { 180 // Empty row set, no need to make an API call. 181 // NOTE: we must return early if arg == RowList{} because reading 182 // an empty RowList from bigtable returns all rows from that table. 183 return nil 184 } 185 req := &btpb.ReadRowsRequest{ 186 TableName: t.c.fullTableName(t.table), 187 AppProfileId: t.c.appProfile, 188 Rows: arg.proto(), 189 } 190 for _, opt := range opts { 191 opt.set(req) 192 } 193 ctx, cancel := context.WithCancel(ctx) // for aborting the stream 194 defer cancel() 195 196 startTime := time.Now() 197 stream, err := t.c.client.ReadRows(ctx, req) 198 if err != nil { 199 return err 200 } 201 cr := newChunkReader() 202 for { 203 res, err := stream.Recv() 204 if err == io.EOF { 205 break 206 } 207 if err != nil { 208 // Reset arg for next Invoke call. 209 arg = arg.retainRowsAfter(prevRowKey) 210 attrMap["rowKey"] = prevRowKey 211 attrMap["error"] = err.Error() 212 attrMap["time_secs"] = time.Since(startTime).Seconds() 213 trace.TracePrintf(ctx, attrMap, "Retry details in ReadRows") 214 return err 215 } 216 attrMap["time_secs"] = time.Since(startTime).Seconds() 217 attrMap["rowCount"] = len(res.Chunks) 218 trace.TracePrintf(ctx, attrMap, "Details in ReadRows") 219 220 for _, cc := range res.Chunks { 221 row, err := cr.Process(cc) 222 if err != nil { 223 // No need to prepare for a retry, this is an unretryable error. 224 return err 225 } 226 if row == nil { 227 continue 228 } 229 prevRowKey = row.Key() 230 if !f(row) { 231 // Cancel and drain stream. 232 cancel() 233 for { 234 if _, err := stream.Recv(); err != nil { 235 // The stream has ended. We don't return an error 236 // because the caller has intentionally interrupted the scan. 237 return nil 238 } 239 } 240 } 241 } 242 if err := cr.Close(); err != nil { 243 // No need to prepare for a retry, this is an unretryable error. 244 return err 245 } 246 } 247 return err 248 }, retryOptions...) 249 250 return err 251} 252 253// ReadRow is a convenience implementation of a single-row reader. 254// A missing row will return a zero-length map and a nil error. 255func (t *Table) ReadRow(ctx context.Context, row string, opts ...ReadOption) (Row, error) { 256 var r Row 257 err := t.ReadRows(ctx, SingleRow(row), func(rr Row) bool { 258 r = rr 259 return true 260 }, opts...) 261 return r, err 262} 263 264// decodeFamilyProto adds the cell data from f to the given row. 265func decodeFamilyProto(r Row, row string, f *btpb.Family) { 266 fam := f.Name // does not have colon 267 for _, col := range f.Columns { 268 for _, cell := range col.Cells { 269 ri := ReadItem{ 270 Row: row, 271 Column: fam + ":" + string(col.Qualifier), 272 Timestamp: Timestamp(cell.TimestampMicros), 273 Value: cell.Value, 274 } 275 r[fam] = append(r[fam], ri) 276 } 277 } 278} 279 280// RowSet is a set of rows to be read. It is satisfied by RowList, RowRange and RowRangeList. 281// The serialized size of the RowSet must be no larger than 1MiB. 282type RowSet interface { 283 proto() *btpb.RowSet 284 285 // retainRowsAfter returns a new RowSet that does not include the 286 // given row key or any row key lexicographically less than it. 287 retainRowsAfter(lastRowKey string) RowSet 288 289 // Valid reports whether this set can cover at least one row. 290 valid() bool 291} 292 293// RowList is a sequence of row keys. 294type RowList []string 295 296func (r RowList) proto() *btpb.RowSet { 297 keys := make([][]byte, len(r)) 298 for i, row := range r { 299 keys[i] = []byte(row) 300 } 301 return &btpb.RowSet{RowKeys: keys} 302} 303 304func (r RowList) retainRowsAfter(lastRowKey string) RowSet { 305 var retryKeys RowList 306 for _, key := range r { 307 if key > lastRowKey { 308 retryKeys = append(retryKeys, key) 309 } 310 } 311 return retryKeys 312} 313 314func (r RowList) valid() bool { 315 return len(r) > 0 316} 317 318// A RowRange is a half-open interval [Start, Limit) encompassing 319// all the rows with keys at least as large as Start, and less than Limit. 320// (Bigtable string comparison is the same as Go's.) 321// A RowRange can be unbounded, encompassing all keys at least as large as Start. 322type RowRange struct { 323 start string 324 limit string 325} 326 327// NewRange returns the new RowRange [begin, end). 328func NewRange(begin, end string) RowRange { 329 return RowRange{ 330 start: begin, 331 limit: end, 332 } 333} 334 335// Unbounded tests whether a RowRange is unbounded. 336func (r RowRange) Unbounded() bool { 337 return r.limit == "" 338} 339 340// Contains says whether the RowRange contains the key. 341func (r RowRange) Contains(row string) bool { 342 return r.start <= row && (r.limit == "" || r.limit > row) 343} 344 345// String provides a printable description of a RowRange. 346func (r RowRange) String() string { 347 a := strconv.Quote(r.start) 348 if r.Unbounded() { 349 return fmt.Sprintf("[%s,∞)", a) 350 } 351 return fmt.Sprintf("[%s,%q)", a, r.limit) 352} 353 354func (r RowRange) proto() *btpb.RowSet { 355 rr := &btpb.RowRange{ 356 StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)}, 357 } 358 if !r.Unbounded() { 359 rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)} 360 } 361 return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}} 362} 363 364func (r RowRange) retainRowsAfter(lastRowKey string) RowSet { 365 if lastRowKey == "" || lastRowKey < r.start { 366 return r 367 } 368 // Set the beginning of the range to the row after the last scanned. 369 start := lastRowKey + "\x00" 370 if r.Unbounded() { 371 return InfiniteRange(start) 372 } 373 return NewRange(start, r.limit) 374} 375 376func (r RowRange) valid() bool { 377 return r.Unbounded() || r.start < r.limit 378} 379 380// RowRangeList is a sequence of RowRanges representing the union of the ranges. 381type RowRangeList []RowRange 382 383func (r RowRangeList) proto() *btpb.RowSet { 384 ranges := make([]*btpb.RowRange, len(r)) 385 for i, rr := range r { 386 // RowRange.proto() returns a RowSet with a single element RowRange array 387 ranges[i] = rr.proto().RowRanges[0] 388 } 389 return &btpb.RowSet{RowRanges: ranges} 390} 391 392func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet { 393 if lastRowKey == "" { 394 return r 395 } 396 // Return a list of any range that has not yet been completely processed 397 var ranges RowRangeList 398 for _, rr := range r { 399 retained := rr.retainRowsAfter(lastRowKey) 400 if retained.valid() { 401 ranges = append(ranges, retained.(RowRange)) 402 } 403 } 404 return ranges 405} 406 407func (r RowRangeList) valid() bool { 408 for _, rr := range r { 409 if rr.valid() { 410 return true 411 } 412 } 413 return false 414} 415 416// SingleRow returns a RowSet for reading a single row. 417func SingleRow(row string) RowSet { 418 return RowList{row} 419} 420 421// PrefixRange returns a RowRange consisting of all keys starting with the prefix. 422func PrefixRange(prefix string) RowRange { 423 return RowRange{ 424 start: prefix, 425 limit: prefixSuccessor(prefix), 426 } 427} 428 429// InfiniteRange returns the RowRange consisting of all keys at least as 430// large as start. 431func InfiniteRange(start string) RowRange { 432 return RowRange{ 433 start: start, 434 limit: "", 435 } 436} 437 438// prefixSuccessor returns the lexically smallest string greater than the 439// prefix, if it exists, or "" otherwise. In either case, it is the string 440// needed for the Limit of a RowRange. 441func prefixSuccessor(prefix string) string { 442 if prefix == "" { 443 return "" // infinite range 444 } 445 n := len(prefix) 446 for n--; n >= 0 && prefix[n] == '\xff'; n-- { 447 } 448 if n == -1 { 449 return "" 450 } 451 ans := []byte(prefix[:n]) 452 ans = append(ans, prefix[n]+1) 453 return string(ans) 454} 455 456// A ReadOption is an optional argument to ReadRows. 457type ReadOption interface { 458 set(req *btpb.ReadRowsRequest) 459} 460 461// RowFilter returns a ReadOption that applies f to the contents of read rows. 462// 463// If multiple RowFilters are provided, only the last is used. To combine filters, 464// use ChainFilters or InterleaveFilters instead. 465func RowFilter(f Filter) ReadOption { return rowFilter{f} } 466 467type rowFilter struct{ f Filter } 468 469func (rf rowFilter) set(req *btpb.ReadRowsRequest) { req.Filter = rf.f.proto() } 470 471// LimitRows returns a ReadOption that will limit the number of rows to be read. 472func LimitRows(limit int64) ReadOption { return limitRows{limit} } 473 474type limitRows struct{ limit int64 } 475 476func (lr limitRows) set(req *btpb.ReadRowsRequest) { req.RowsLimit = lr.limit } 477 478// mutationsAreRetryable returns true if all mutations are idempotent 479// and therefore retryable. A mutation is idempotent iff all cell timestamps 480// have an explicit timestamp set and do not rely on the timestamp being set on the server. 481func mutationsAreRetryable(muts []*btpb.Mutation) bool { 482 serverTime := int64(ServerTime) 483 for _, mut := range muts { 484 setCell := mut.GetSetCell() 485 if setCell != nil && setCell.TimestampMicros == serverTime { 486 return false 487 } 488 } 489 return true 490} 491 492const maxMutations = 100000 493 494// Apply mutates a row atomically. A mutation must contain at least one 495// operation and at most 100000 operations. 496func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) (err error) { 497 ctx = mergeOutgoingMetadata(ctx, t.md) 498 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply") 499 defer func() { trace.EndSpan(ctx, err) }() 500 501 after := func(res proto.Message) { 502 for _, o := range opts { 503 o.after(res) 504 } 505 } 506 507 var callOptions []gax.CallOption 508 if m.cond == nil { 509 req := &btpb.MutateRowRequest{ 510 TableName: t.c.fullTableName(t.table), 511 AppProfileId: t.c.appProfile, 512 RowKey: []byte(row), 513 Mutations: m.ops, 514 } 515 if mutationsAreRetryable(m.ops) { 516 callOptions = retryOptions 517 } 518 var res *btpb.MutateRowResponse 519 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 520 var err error 521 res, err = t.c.client.MutateRow(ctx, req) 522 return err 523 }, callOptions...) 524 if err == nil { 525 after(res) 526 } 527 return err 528 } 529 530 req := &btpb.CheckAndMutateRowRequest{ 531 TableName: t.c.fullTableName(t.table), 532 AppProfileId: t.c.appProfile, 533 RowKey: []byte(row), 534 PredicateFilter: m.cond.proto(), 535 } 536 if m.mtrue != nil { 537 if m.mtrue.cond != nil { 538 return errors.New("bigtable: conditional mutations cannot be nested") 539 } 540 req.TrueMutations = m.mtrue.ops 541 } 542 if m.mfalse != nil { 543 if m.mfalse.cond != nil { 544 return errors.New("bigtable: conditional mutations cannot be nested") 545 } 546 req.FalseMutations = m.mfalse.ops 547 } 548 if mutationsAreRetryable(req.TrueMutations) && mutationsAreRetryable(req.FalseMutations) { 549 callOptions = retryOptions 550 } 551 var cmRes *btpb.CheckAndMutateRowResponse 552 err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 553 var err error 554 cmRes, err = t.c.client.CheckAndMutateRow(ctx, req) 555 return err 556 }, callOptions...) 557 if err == nil { 558 after(cmRes) 559 } 560 return err 561} 562 563// An ApplyOption is an optional argument to Apply. 564type ApplyOption interface { 565 after(res proto.Message) 566} 567 568type applyAfterFunc func(res proto.Message) 569 570func (a applyAfterFunc) after(res proto.Message) { a(res) } 571 572// GetCondMutationResult returns an ApplyOption that reports whether the conditional 573// mutation's condition matched. 574func GetCondMutationResult(matched *bool) ApplyOption { 575 return applyAfterFunc(func(res proto.Message) { 576 if res, ok := res.(*btpb.CheckAndMutateRowResponse); ok { 577 *matched = res.PredicateMatched 578 } 579 }) 580} 581 582// Mutation represents a set of changes for a single row of a table. 583type Mutation struct { 584 ops []*btpb.Mutation 585 586 // for conditional mutations 587 cond Filter 588 mtrue, mfalse *Mutation 589} 590 591// NewMutation returns a new mutation. 592func NewMutation() *Mutation { 593 return new(Mutation) 594} 595 596// NewCondMutation returns a conditional mutation. 597// The given row filter determines which mutation is applied: 598// If the filter matches any cell in the row, mtrue is applied; 599// otherwise, mfalse is applied. 600// Either given mutation may be nil. 601// 602// The application of a ReadModifyWrite is atomic; concurrent ReadModifyWrites will 603// be executed serially by the server. 604func NewCondMutation(cond Filter, mtrue, mfalse *Mutation) *Mutation { 605 return &Mutation{cond: cond, mtrue: mtrue, mfalse: mfalse} 606} 607 608// Set sets a value in a specified column, with the given timestamp. 609// The timestamp will be truncated to millisecond granularity. 610// A timestamp of ServerTime means to use the server timestamp. 611func (m *Mutation) Set(family, column string, ts Timestamp, value []byte) { 612 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ 613 FamilyName: family, 614 ColumnQualifier: []byte(column), 615 TimestampMicros: int64(ts.TruncateToMilliseconds()), 616 Value: value, 617 }}}) 618} 619 620// DeleteCellsInColumn will delete all the cells whose columns are family:column. 621func (m *Mutation) DeleteCellsInColumn(family, column string) { 622 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{ 623 FamilyName: family, 624 ColumnQualifier: []byte(column), 625 }}}) 626} 627 628// DeleteTimestampRange deletes all cells whose columns are family:column 629// and whose timestamps are in the half-open interval [start, end). 630// If end is zero, it will be interpreted as infinity. 631// The timestamps will be truncated to millisecond granularity. 632func (m *Mutation) DeleteTimestampRange(family, column string, start, end Timestamp) { 633 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{ 634 FamilyName: family, 635 ColumnQualifier: []byte(column), 636 TimeRange: &btpb.TimestampRange{ 637 StartTimestampMicros: int64(start.TruncateToMilliseconds()), 638 EndTimestampMicros: int64(end.TruncateToMilliseconds()), 639 }, 640 }}}) 641} 642 643// DeleteCellsInFamily will delete all the cells whose columns are family:*. 644func (m *Mutation) DeleteCellsInFamily(family string) { 645 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromFamily_{DeleteFromFamily: &btpb.Mutation_DeleteFromFamily{ 646 FamilyName: family, 647 }}}) 648} 649 650// DeleteRow deletes the entire row. 651func (m *Mutation) DeleteRow() { 652 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromRow_{DeleteFromRow: &btpb.Mutation_DeleteFromRow{}}}) 653} 654 655// entryErr is a container that combines an entry with the error that was returned for it. 656// Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed. 657type entryErr struct { 658 Entry *btpb.MutateRowsRequest_Entry 659 Err error 660} 661 662// ApplyBulk applies multiple Mutations, up to a maximum of 100,000. 663// Each mutation is individually applied atomically, 664// but the set of mutations may be applied in any order. 665// 666// Two types of failures may occur. If the entire process 667// fails, (nil, err) will be returned. If specific mutations 668// fail to apply, ([]err, nil) will be returned, and the errors 669// will correspond to the relevant rowKeys/muts arguments. 670// 671// Conditional mutations cannot be applied in bulk and providing one will result in an error. 672func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) { 673 ctx = mergeOutgoingMetadata(ctx, t.md) 674 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/ApplyBulk") 675 defer func() { trace.EndSpan(ctx, err) }() 676 677 if len(rowKeys) != len(muts) { 678 return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts)) 679 } 680 681 origEntries := make([]*entryErr, len(rowKeys)) 682 for i, key := range rowKeys { 683 mut := muts[i] 684 if mut.cond != nil { 685 return nil, errors.New("conditional mutations cannot be applied in bulk") 686 } 687 origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}} 688 } 689 690 for _, group := range groupEntries(origEntries, maxMutations) { 691 attrMap := make(map[string]interface{}) 692 err = gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 693 attrMap["rowCount"] = len(group) 694 trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk") 695 err := t.doApplyBulk(ctx, group, opts...) 696 if err != nil { 697 // We want to retry the entire request with the current group 698 return err 699 } 700 group = t.getApplyBulkRetries(group) 701 if len(group) > 0 && len(idempotentRetryCodes) > 0 { 702 // We have at least one mutation that needs to be retried. 703 // Return an arbitrary error that is retryable according to callOptions. 704 return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk") 705 } 706 return nil 707 }, retryOptions...) 708 if err != nil { 709 return nil, err 710 } 711 } 712 713 // All the errors are accumulated into an array and returned, interspersed with nils for successful 714 // entries. The absence of any errors means we should return nil. 715 var foundErr bool 716 for _, entry := range origEntries { 717 if entry.Err != nil { 718 foundErr = true 719 } 720 errs = append(errs, entry.Err) 721 } 722 if foundErr { 723 return errs, nil 724 } 725 return nil, nil 726} 727 728// getApplyBulkRetries returns the entries that need to be retried 729func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr { 730 var retryEntries []*entryErr 731 for _, entry := range entries { 732 err := entry.Err 733 if err != nil && isIdempotentRetryCode[status.Code(err)] && mutationsAreRetryable(entry.Entry.Mutations) { 734 // There was an error and the entry is retryable. 735 retryEntries = append(retryEntries, entry) 736 } 737 } 738 return retryEntries 739} 740 741// doApplyBulk does the work of a single ApplyBulk invocation 742func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...ApplyOption) error { 743 after := func(res proto.Message) { 744 for _, o := range opts { 745 o.after(res) 746 } 747 } 748 749 entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs)) 750 for i, entryErr := range entryErrs { 751 entries[i] = entryErr.Entry 752 } 753 req := &btpb.MutateRowsRequest{ 754 TableName: t.c.fullTableName(t.table), 755 AppProfileId: t.c.appProfile, 756 Entries: entries, 757 } 758 stream, err := t.c.client.MutateRows(ctx, req) 759 if err != nil { 760 return err 761 } 762 for { 763 res, err := stream.Recv() 764 if err == io.EOF { 765 break 766 } 767 if err != nil { 768 return err 769 } 770 771 for i, entry := range res.Entries { 772 s := entry.Status 773 if s.Code == int32(codes.OK) { 774 entryErrs[i].Err = nil 775 } else { 776 entryErrs[i].Err = status.Errorf(codes.Code(s.Code), s.Message) 777 } 778 } 779 after(res) 780 } 781 return nil 782} 783 784// groupEntries groups entries into groups of a specified size without breaking up 785// individual entries. 786func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr { 787 var ( 788 res [][]*entryErr 789 start int 790 gmuts int 791 ) 792 addGroup := func(end int) { 793 if end-start > 0 { 794 res = append(res, entries[start:end]) 795 start = end 796 gmuts = 0 797 } 798 } 799 for i, e := range entries { 800 emuts := len(e.Entry.Mutations) 801 if gmuts+emuts > maxSize { 802 addGroup(i) 803 } 804 gmuts += emuts 805 } 806 addGroup(len(entries)) 807 return res 808} 809 810// Timestamp is in units of microseconds since 1 January 1970. 811type Timestamp int64 812 813// ServerTime is a specific Timestamp that may be passed to (*Mutation).Set. 814// It indicates that the server's timestamp should be used. 815const ServerTime Timestamp = -1 816 817// Time converts a time.Time into a Timestamp. 818func Time(t time.Time) Timestamp { return Timestamp(t.UnixNano() / 1e3) } 819 820// Now returns the Timestamp representation of the current time on the client. 821func Now() Timestamp { return Time(time.Now()) } 822 823// Time converts a Timestamp into a time.Time. 824func (ts Timestamp) Time() time.Time { return time.Unix(int64(ts)/1e6, int64(ts)%1e6*1e3) } 825 826// TruncateToMilliseconds truncates a Timestamp to millisecond granularity, 827// which is currently the only granularity supported. 828func (ts Timestamp) TruncateToMilliseconds() Timestamp { 829 if ts == ServerTime { 830 return ts 831 } 832 return ts - ts%1000 833} 834 835// ApplyReadModifyWrite applies a ReadModifyWrite to a specific row. 836// It returns the newly written cells. 837func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) { 838 ctx = mergeOutgoingMetadata(ctx, t.md) 839 req := &btpb.ReadModifyWriteRowRequest{ 840 TableName: t.c.fullTableName(t.table), 841 AppProfileId: t.c.appProfile, 842 RowKey: []byte(row), 843 Rules: m.ops, 844 } 845 res, err := t.c.client.ReadModifyWriteRow(ctx, req) 846 if err != nil { 847 return nil, err 848 } 849 if res.Row == nil { 850 return nil, errors.New("unable to apply ReadModifyWrite: res.Row=nil") 851 } 852 r := make(Row) 853 for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family 854 decodeFamilyProto(r, row, fam) 855 } 856 return r, nil 857} 858 859// ReadModifyWrite represents a set of operations on a single row of a table. 860// It is like Mutation but for non-idempotent changes. 861// When applied, these operations operate on the latest values of the row's cells, 862// and result in a new value being written to the relevant cell with a timestamp 863// that is max(existing timestamp, current server time). 864// 865// The application of a ReadModifyWrite is atomic; concurrent ReadModifyWrites will 866// be executed serially by the server. 867type ReadModifyWrite struct { 868 ops []*btpb.ReadModifyWriteRule 869} 870 871// NewReadModifyWrite returns a new ReadModifyWrite. 872func NewReadModifyWrite() *ReadModifyWrite { return new(ReadModifyWrite) } 873 874// AppendValue appends a value to a specific cell's value. 875// If the cell is unset, it will be treated as an empty value. 876func (m *ReadModifyWrite) AppendValue(family, column string, v []byte) { 877 m.ops = append(m.ops, &btpb.ReadModifyWriteRule{ 878 FamilyName: family, 879 ColumnQualifier: []byte(column), 880 Rule: &btpb.ReadModifyWriteRule_AppendValue{AppendValue: v}, 881 }) 882} 883 884// Increment interprets the value in a specific cell as a 64-bit big-endian signed integer, 885// and adds a value to it. If the cell is unset, it will be treated as zero. 886// If the cell is set and is not an 8-byte value, the entire ApplyReadModifyWrite 887// operation will fail. 888func (m *ReadModifyWrite) Increment(family, column string, delta int64) { 889 m.ops = append(m.ops, &btpb.ReadModifyWriteRule{ 890 FamilyName: family, 891 ColumnQualifier: []byte(column), 892 Rule: &btpb.ReadModifyWriteRule_IncrementAmount{IncrementAmount: delta}, 893 }) 894} 895 896// SampleRowKeys returns a sample of row keys in the table. The returned row keys will delimit contiguous sections of 897// the table of approximately equal size, which can be used to break up the data for distributed tasks like mapreduces. 898func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) { 899 ctx = mergeOutgoingMetadata(ctx, t.md) 900 var sampledRowKeys []string 901 err := gax.Invoke(ctx, func(ctx context.Context, _ gax.CallSettings) error { 902 sampledRowKeys = nil 903 req := &btpb.SampleRowKeysRequest{ 904 TableName: t.c.fullTableName(t.table), 905 AppProfileId: t.c.appProfile, 906 } 907 ctx, cancel := context.WithCancel(ctx) // for aborting the stream 908 defer cancel() 909 910 stream, err := t.c.client.SampleRowKeys(ctx, req) 911 if err != nil { 912 return err 913 } 914 for { 915 res, err := stream.Recv() 916 if err == io.EOF { 917 break 918 } 919 if err != nil { 920 return err 921 } 922 923 key := string(res.RowKey) 924 if key == "" { 925 continue 926 } 927 928 sampledRowKeys = append(sampledRowKeys, key) 929 } 930 return nil 931 }, retryOptions...) 932 return sampledRowKeys, err 933} 934