1/* 2Copyright 2015 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package bigtable // import "cloud.google.com/go/bigtable" 18 19import ( 20 "context" 21 "errors" 22 "fmt" 23 "io" 24 "strconv" 25 "time" 26 27 "cloud.google.com/go/bigtable/internal/gax" 28 btopt "cloud.google.com/go/bigtable/internal/option" 29 "cloud.google.com/go/internal/trace" 30 "github.com/golang/protobuf/proto" 31 "google.golang.org/api/option" 32 gtransport "google.golang.org/api/transport/grpc" 33 btpb "google.golang.org/genproto/googleapis/bigtable/v2" 34 "google.golang.org/grpc" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/metadata" 37 "google.golang.org/grpc/status" 38) 39 40const prodAddr = "bigtable.googleapis.com:443" 41 42// Client is a client for reading and writing data to tables in an instance. 43// 44// A Client is safe to use concurrently, except for its Close method. 45type Client struct { 46 conn *grpc.ClientConn 47 client btpb.BigtableClient 48 project, instance string 49 appProfile string 50} 51 52// ClientConfig has configurations for the client. 53type ClientConfig struct { 54 // The id of the app profile to associate with all data operations sent from this client. 55 // If unspecified, the default app profile for the instance will be used. 56 AppProfile string 57} 58 59// NewClient creates a new Client for a given project and instance. 60// The default ClientConfig will be used. 61func NewClient(ctx context.Context, project, instance string, opts ...option.ClientOption) (*Client, error) { 62 return NewClientWithConfig(ctx, project, instance, ClientConfig{}, opts...) 63} 64 65// NewClientWithConfig creates a new client with the given config. 66func NewClientWithConfig(ctx context.Context, project, instance string, config ClientConfig, opts ...option.ClientOption) (*Client, error) { 67 o, err := btopt.DefaultClientOptions(prodAddr, Scope, clientUserAgent) 68 if err != nil { 69 return nil, err 70 } 71 // Default to a small connection pool that can be overridden. 72 o = append(o, 73 option.WithGRPCConnectionPool(4), 74 // Set the max size to correspond to server-side limits. 75 option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(1<<28), grpc.MaxCallRecvMsgSize(1<<28))), 76 // TODO(grpc/grpc-go#1388) using connection pool without WithBlock 77 // can cause RPCs to fail randomly. We can delete this after the issue is fixed. 78 option.WithGRPCDialOption(grpc.WithBlock())) 79 o = append(o, opts...) 80 conn, err := gtransport.Dial(ctx, o...) 81 if err != nil { 82 return nil, fmt.Errorf("dialing: %v", err) 83 } 84 85 return &Client{ 86 conn: conn, 87 client: btpb.NewBigtableClient(conn), 88 project: project, 89 instance: instance, 90 appProfile: config.AppProfile, 91 }, nil 92} 93 94// Close closes the Client. 95func (c *Client) Close() error { 96 return c.conn.Close() 97} 98 99var ( 100 idempotentRetryCodes = []codes.Code{codes.DeadlineExceeded, codes.Unavailable, codes.Aborted} 101 isIdempotentRetryCode = make(map[codes.Code]bool) 102 retryOptions = []gax.CallOption{ 103 gax.WithDelayTimeoutSettings(100*time.Millisecond, 2000*time.Millisecond, 1.2), 104 gax.WithRetryCodes(idempotentRetryCodes), 105 } 106) 107 108func init() { 109 for _, code := range idempotentRetryCodes { 110 isIdempotentRetryCode[code] = true 111 } 112} 113 114func (c *Client) fullTableName(table string) string { 115 return fmt.Sprintf("projects/%s/instances/%s/tables/%s", c.project, c.instance, table) 116} 117 118// A Table refers to a table. 119// 120// A Table is safe to use concurrently. 121type Table struct { 122 c *Client 123 table string 124 125 // Metadata to be sent with each request. 126 md metadata.MD 127} 128 129// Open opens a table. 130func (c *Client) Open(table string) *Table { 131 return &Table{ 132 c: c, 133 table: table, 134 md: metadata.Pairs(resourcePrefixHeader, c.fullTableName(table)), 135 } 136} 137 138// TODO(dsymonds): Read method that returns a sequence of ReadItems. 139 140// ReadRows reads rows from a table. f is called for each row. 141// If f returns false, the stream is shut down and ReadRows returns. 142// f owns its argument, and f is called serially in order by row key. 143// 144// By default, the yielded rows will contain all values in all cells. 145// Use RowFilter to limit the cells returned. 146func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts ...ReadOption) (err error) { 147 ctx = mergeOutgoingMetadata(ctx, t.md) 148 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable.ReadRows") 149 defer func() { trace.EndSpan(ctx, err) }() 150 151 var prevRowKey string 152 attrMap := make(map[string]interface{}) 153 err = gax.Invoke(ctx, func(ctx context.Context) error { 154 if !arg.valid() { 155 // Empty row set, no need to make an API call. 156 // NOTE: we must return early if arg == RowList{} because reading 157 // an empty RowList from bigtable returns all rows from that table. 158 return nil 159 } 160 req := &btpb.ReadRowsRequest{ 161 TableName: t.c.fullTableName(t.table), 162 AppProfileId: t.c.appProfile, 163 Rows: arg.proto(), 164 } 165 for _, opt := range opts { 166 opt.set(req) 167 } 168 ctx, cancel := context.WithCancel(ctx) // for aborting the stream 169 defer cancel() 170 171 startTime := time.Now() 172 stream, err := t.c.client.ReadRows(ctx, req) 173 if err != nil { 174 return err 175 } 176 cr := newChunkReader() 177 for { 178 res, err := stream.Recv() 179 if err == io.EOF { 180 break 181 } 182 if err != nil { 183 // Reset arg for next Invoke call. 184 arg = arg.retainRowsAfter(prevRowKey) 185 attrMap["rowKey"] = prevRowKey 186 attrMap["error"] = err.Error() 187 attrMap["time_secs"] = time.Since(startTime).Seconds() 188 trace.TracePrintf(ctx, attrMap, "Retry details in ReadRows") 189 return err 190 } 191 attrMap["time_secs"] = time.Since(startTime).Seconds() 192 attrMap["rowCount"] = len(res.Chunks) 193 trace.TracePrintf(ctx, attrMap, "Details in ReadRows") 194 195 for _, cc := range res.Chunks { 196 row, err := cr.Process(cc) 197 if err != nil { 198 // No need to prepare for a retry, this is an unretryable error. 199 return err 200 } 201 if row == nil { 202 continue 203 } 204 prevRowKey = row.Key() 205 if !f(row) { 206 // Cancel and drain stream. 207 cancel() 208 for { 209 if _, err := stream.Recv(); err != nil { 210 // The stream has ended. We don't return an error 211 // because the caller has intentionally interrupted the scan. 212 return nil 213 } 214 } 215 } 216 } 217 if err := cr.Close(); err != nil { 218 // No need to prepare for a retry, this is an unretryable error. 219 return err 220 } 221 } 222 return err 223 }, retryOptions...) 224 225 return err 226} 227 228// ReadRow is a convenience implementation of a single-row reader. 229// A missing row will return a zero-length map and a nil error. 230func (t *Table) ReadRow(ctx context.Context, row string, opts ...ReadOption) (Row, error) { 231 var r Row 232 err := t.ReadRows(ctx, SingleRow(row), func(rr Row) bool { 233 r = rr 234 return true 235 }, opts...) 236 return r, err 237} 238 239// decodeFamilyProto adds the cell data from f to the given row. 240func decodeFamilyProto(r Row, row string, f *btpb.Family) { 241 fam := f.Name // does not have colon 242 for _, col := range f.Columns { 243 for _, cell := range col.Cells { 244 ri := ReadItem{ 245 Row: row, 246 Column: fam + ":" + string(col.Qualifier), 247 Timestamp: Timestamp(cell.TimestampMicros), 248 Value: cell.Value, 249 } 250 r[fam] = append(r[fam], ri) 251 } 252 } 253} 254 255// RowSet is a set of rows to be read. It is satisfied by RowList, RowRange and RowRangeList. 256// The serialized size of the RowSet must be no larger than 1MiB. 257type RowSet interface { 258 proto() *btpb.RowSet 259 260 // retainRowsAfter returns a new RowSet that does not include the 261 // given row key or any row key lexicographically less than it. 262 retainRowsAfter(lastRowKey string) RowSet 263 264 // Valid reports whether this set can cover at least one row. 265 valid() bool 266} 267 268// RowList is a sequence of row keys. 269type RowList []string 270 271func (r RowList) proto() *btpb.RowSet { 272 keys := make([][]byte, len(r)) 273 for i, row := range r { 274 keys[i] = []byte(row) 275 } 276 return &btpb.RowSet{RowKeys: keys} 277} 278 279func (r RowList) retainRowsAfter(lastRowKey string) RowSet { 280 var retryKeys RowList 281 for _, key := range r { 282 if key > lastRowKey { 283 retryKeys = append(retryKeys, key) 284 } 285 } 286 return retryKeys 287} 288 289func (r RowList) valid() bool { 290 return len(r) > 0 291} 292 293// A RowRange is a half-open interval [Start, Limit) encompassing 294// all the rows with keys at least as large as Start, and less than Limit. 295// (Bigtable string comparison is the same as Go's.) 296// A RowRange can be unbounded, encompassing all keys at least as large as Start. 297type RowRange struct { 298 start string 299 limit string 300} 301 302// NewRange returns the new RowRange [begin, end). 303func NewRange(begin, end string) RowRange { 304 return RowRange{ 305 start: begin, 306 limit: end, 307 } 308} 309 310// Unbounded tests whether a RowRange is unbounded. 311func (r RowRange) Unbounded() bool { 312 return r.limit == "" 313} 314 315// Contains says whether the RowRange contains the key. 316func (r RowRange) Contains(row string) bool { 317 return r.start <= row && (r.limit == "" || r.limit > row) 318} 319 320// String provides a printable description of a RowRange. 321func (r RowRange) String() string { 322 a := strconv.Quote(r.start) 323 if r.Unbounded() { 324 return fmt.Sprintf("[%s,∞)", a) 325 } 326 return fmt.Sprintf("[%s,%q)", a, r.limit) 327} 328 329func (r RowRange) proto() *btpb.RowSet { 330 rr := &btpb.RowRange{ 331 StartKey: &btpb.RowRange_StartKeyClosed{StartKeyClosed: []byte(r.start)}, 332 } 333 if !r.Unbounded() { 334 rr.EndKey = &btpb.RowRange_EndKeyOpen{EndKeyOpen: []byte(r.limit)} 335 } 336 return &btpb.RowSet{RowRanges: []*btpb.RowRange{rr}} 337} 338 339func (r RowRange) retainRowsAfter(lastRowKey string) RowSet { 340 if lastRowKey == "" || lastRowKey < r.start { 341 return r 342 } 343 // Set the beginning of the range to the row after the last scanned. 344 start := lastRowKey + "\x00" 345 if r.Unbounded() { 346 return InfiniteRange(start) 347 } 348 return NewRange(start, r.limit) 349} 350 351func (r RowRange) valid() bool { 352 return r.Unbounded() || r.start < r.limit 353} 354 355// RowRangeList is a sequence of RowRanges representing the union of the ranges. 356type RowRangeList []RowRange 357 358func (r RowRangeList) proto() *btpb.RowSet { 359 ranges := make([]*btpb.RowRange, len(r)) 360 for i, rr := range r { 361 // RowRange.proto() returns a RowSet with a single element RowRange array 362 ranges[i] = rr.proto().RowRanges[0] 363 } 364 return &btpb.RowSet{RowRanges: ranges} 365} 366 367func (r RowRangeList) retainRowsAfter(lastRowKey string) RowSet { 368 if lastRowKey == "" { 369 return r 370 } 371 // Return a list of any range that has not yet been completely processed 372 var ranges RowRangeList 373 for _, rr := range r { 374 retained := rr.retainRowsAfter(lastRowKey) 375 if retained.valid() { 376 ranges = append(ranges, retained.(RowRange)) 377 } 378 } 379 return ranges 380} 381 382func (r RowRangeList) valid() bool { 383 for _, rr := range r { 384 if rr.valid() { 385 return true 386 } 387 } 388 return false 389} 390 391// SingleRow returns a RowSet for reading a single row. 392func SingleRow(row string) RowSet { 393 return RowList{row} 394} 395 396// PrefixRange returns a RowRange consisting of all keys starting with the prefix. 397func PrefixRange(prefix string) RowRange { 398 return RowRange{ 399 start: prefix, 400 limit: prefixSuccessor(prefix), 401 } 402} 403 404// InfiniteRange returns the RowRange consisting of all keys at least as 405// large as start. 406func InfiniteRange(start string) RowRange { 407 return RowRange{ 408 start: start, 409 limit: "", 410 } 411} 412 413// prefixSuccessor returns the lexically smallest string greater than the 414// prefix, if it exists, or "" otherwise. In either case, it is the string 415// needed for the Limit of a RowRange. 416func prefixSuccessor(prefix string) string { 417 if prefix == "" { 418 return "" // infinite range 419 } 420 n := len(prefix) 421 for n--; n >= 0 && prefix[n] == '\xff'; n-- { 422 } 423 if n == -1 { 424 return "" 425 } 426 ans := []byte(prefix[:n]) 427 ans = append(ans, prefix[n]+1) 428 return string(ans) 429} 430 431// A ReadOption is an optional argument to ReadRows. 432type ReadOption interface { 433 set(req *btpb.ReadRowsRequest) 434} 435 436// RowFilter returns a ReadOption that applies f to the contents of read rows. 437// 438// If multiple RowFilters are provided, only the last is used. To combine filters, 439// use ChainFilters or InterleaveFilters instead. 440func RowFilter(f Filter) ReadOption { return rowFilter{f} } 441 442type rowFilter struct{ f Filter } 443 444func (rf rowFilter) set(req *btpb.ReadRowsRequest) { req.Filter = rf.f.proto() } 445 446// LimitRows returns a ReadOption that will limit the number of rows to be read. 447func LimitRows(limit int64) ReadOption { return limitRows{limit} } 448 449type limitRows struct{ limit int64 } 450 451func (lr limitRows) set(req *btpb.ReadRowsRequest) { req.RowsLimit = lr.limit } 452 453// mutationsAreRetryable returns true if all mutations are idempotent 454// and therefore retryable. A mutation is idempotent iff all cell timestamps 455// have an explicit timestamp set and do not rely on the timestamp being set on the server. 456func mutationsAreRetryable(muts []*btpb.Mutation) bool { 457 serverTime := int64(ServerTime) 458 for _, mut := range muts { 459 setCell := mut.GetSetCell() 460 if setCell != nil && setCell.TimestampMicros == serverTime { 461 return false 462 } 463 } 464 return true 465} 466 467const maxMutations = 100000 468 469// Apply mutates a row atomically. A mutation must contain at least one 470// operation and at most 100000 operations. 471func (t *Table) Apply(ctx context.Context, row string, m *Mutation, opts ...ApplyOption) (err error) { 472 ctx = mergeOutgoingMetadata(ctx, t.md) 473 after := func(res proto.Message) { 474 for _, o := range opts { 475 o.after(res) 476 } 477 } 478 479 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/Apply") 480 defer func() { trace.EndSpan(ctx, err) }() 481 var callOptions []gax.CallOption 482 if m.cond == nil { 483 req := &btpb.MutateRowRequest{ 484 TableName: t.c.fullTableName(t.table), 485 AppProfileId: t.c.appProfile, 486 RowKey: []byte(row), 487 Mutations: m.ops, 488 } 489 if mutationsAreRetryable(m.ops) { 490 callOptions = retryOptions 491 } 492 var res *btpb.MutateRowResponse 493 err := gax.Invoke(ctx, func(ctx context.Context) error { 494 var err error 495 res, err = t.c.client.MutateRow(ctx, req) 496 return err 497 }, callOptions...) 498 if err == nil { 499 after(res) 500 } 501 return err 502 } 503 504 req := &btpb.CheckAndMutateRowRequest{ 505 TableName: t.c.fullTableName(t.table), 506 AppProfileId: t.c.appProfile, 507 RowKey: []byte(row), 508 PredicateFilter: m.cond.proto(), 509 } 510 if m.mtrue != nil { 511 if m.mtrue.cond != nil { 512 return errors.New("bigtable: conditional mutations cannot be nested") 513 } 514 req.TrueMutations = m.mtrue.ops 515 } 516 if m.mfalse != nil { 517 if m.mfalse.cond != nil { 518 return errors.New("bigtable: conditional mutations cannot be nested") 519 } 520 req.FalseMutations = m.mfalse.ops 521 } 522 if mutationsAreRetryable(req.TrueMutations) && mutationsAreRetryable(req.FalseMutations) { 523 callOptions = retryOptions 524 } 525 var cmRes *btpb.CheckAndMutateRowResponse 526 err = gax.Invoke(ctx, func(ctx context.Context) error { 527 var err error 528 cmRes, err = t.c.client.CheckAndMutateRow(ctx, req) 529 return err 530 }, callOptions...) 531 if err == nil { 532 after(cmRes) 533 } 534 return err 535} 536 537// An ApplyOption is an optional argument to Apply. 538type ApplyOption interface { 539 after(res proto.Message) 540} 541 542type applyAfterFunc func(res proto.Message) 543 544func (a applyAfterFunc) after(res proto.Message) { a(res) } 545 546// GetCondMutationResult returns an ApplyOption that reports whether the conditional 547// mutation's condition matched. 548func GetCondMutationResult(matched *bool) ApplyOption { 549 return applyAfterFunc(func(res proto.Message) { 550 if res, ok := res.(*btpb.CheckAndMutateRowResponse); ok { 551 *matched = res.PredicateMatched 552 } 553 }) 554} 555 556// Mutation represents a set of changes for a single row of a table. 557type Mutation struct { 558 ops []*btpb.Mutation 559 560 // for conditional mutations 561 cond Filter 562 mtrue, mfalse *Mutation 563} 564 565// NewMutation returns a new mutation. 566func NewMutation() *Mutation { 567 return new(Mutation) 568} 569 570// NewCondMutation returns a conditional mutation. 571// The given row filter determines which mutation is applied: 572// If the filter matches any cell in the row, mtrue is applied; 573// otherwise, mfalse is applied. 574// Either given mutation may be nil. 575func NewCondMutation(cond Filter, mtrue, mfalse *Mutation) *Mutation { 576 return &Mutation{cond: cond, mtrue: mtrue, mfalse: mfalse} 577} 578 579// Set sets a value in a specified column, with the given timestamp. 580// The timestamp will be truncated to millisecond granularity. 581// A timestamp of ServerTime means to use the server timestamp. 582func (m *Mutation) Set(family, column string, ts Timestamp, value []byte) { 583 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_SetCell_{SetCell: &btpb.Mutation_SetCell{ 584 FamilyName: family, 585 ColumnQualifier: []byte(column), 586 TimestampMicros: int64(ts.TruncateToMilliseconds()), 587 Value: value, 588 }}}) 589} 590 591// DeleteCellsInColumn will delete all the cells whose columns are family:column. 592func (m *Mutation) DeleteCellsInColumn(family, column string) { 593 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{ 594 FamilyName: family, 595 ColumnQualifier: []byte(column), 596 }}}) 597} 598 599// DeleteTimestampRange deletes all cells whose columns are family:column 600// and whose timestamps are in the half-open interval [start, end). 601// If end is zero, it will be interpreted as infinity. 602// The timestamps will be truncated to millisecond granularity. 603func (m *Mutation) DeleteTimestampRange(family, column string, start, end Timestamp) { 604 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromColumn_{DeleteFromColumn: &btpb.Mutation_DeleteFromColumn{ 605 FamilyName: family, 606 ColumnQualifier: []byte(column), 607 TimeRange: &btpb.TimestampRange{ 608 StartTimestampMicros: int64(start.TruncateToMilliseconds()), 609 EndTimestampMicros: int64(end.TruncateToMilliseconds()), 610 }, 611 }}}) 612} 613 614// DeleteCellsInFamily will delete all the cells whose columns are family:*. 615func (m *Mutation) DeleteCellsInFamily(family string) { 616 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromFamily_{DeleteFromFamily: &btpb.Mutation_DeleteFromFamily{ 617 FamilyName: family, 618 }}}) 619} 620 621// DeleteRow deletes the entire row. 622func (m *Mutation) DeleteRow() { 623 m.ops = append(m.ops, &btpb.Mutation{Mutation: &btpb.Mutation_DeleteFromRow_{DeleteFromRow: &btpb.Mutation_DeleteFromRow{}}}) 624} 625 626// entryErr is a container that combines an entry with the error that was returned for it. 627// Err may be nil if no error was returned for the Entry, or if the Entry has not yet been processed. 628type entryErr struct { 629 Entry *btpb.MutateRowsRequest_Entry 630 Err error 631} 632 633// ApplyBulk applies multiple Mutations, up to a maximum of 100,000. 634// Each mutation is individually applied atomically, 635// but the set of mutations may be applied in any order. 636// 637// Two types of failures may occur. If the entire process 638// fails, (nil, err) will be returned. If specific mutations 639// fail to apply, ([]err, nil) will be returned, and the errors 640// will correspond to the relevant rowKeys/muts arguments. 641// 642// Conditional mutations cannot be applied in bulk and providing one will result in an error. 643func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutation, opts ...ApplyOption) (errs []error, err error) { 644 ctx = mergeOutgoingMetadata(ctx, t.md) 645 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigtable/ApplyBulk") 646 defer func() { trace.EndSpan(ctx, err) }() 647 648 if len(rowKeys) != len(muts) { 649 return nil, fmt.Errorf("mismatched rowKeys and mutation array lengths: %d, %d", len(rowKeys), len(muts)) 650 } 651 652 origEntries := make([]*entryErr, len(rowKeys)) 653 for i, key := range rowKeys { 654 mut := muts[i] 655 if mut.cond != nil { 656 return nil, errors.New("conditional mutations cannot be applied in bulk") 657 } 658 origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}} 659 } 660 661 for _, group := range groupEntries(origEntries, maxMutations) { 662 attrMap := make(map[string]interface{}) 663 err = gax.Invoke(ctx, func(ctx context.Context) error { 664 attrMap["rowCount"] = len(group) 665 trace.TracePrintf(ctx, attrMap, "Row count in ApplyBulk") 666 err := t.doApplyBulk(ctx, group, opts...) 667 if err != nil { 668 // We want to retry the entire request with the current group 669 return err 670 } 671 group = t.getApplyBulkRetries(group) 672 if len(group) > 0 && len(idempotentRetryCodes) > 0 { 673 // We have at least one mutation that needs to be retried. 674 // Return an arbitrary error that is retryable according to callOptions. 675 return status.Errorf(idempotentRetryCodes[0], "Synthetic error: partial failure of ApplyBulk") 676 } 677 return nil 678 }, retryOptions...) 679 if err != nil { 680 return nil, err 681 } 682 } 683 684 // All the errors are accumulated into an array and returned, interspersed with nils for successful 685 // entries. The absence of any errors means we should return nil. 686 var foundErr bool 687 for _, entry := range origEntries { 688 if entry.Err != nil { 689 foundErr = true 690 } 691 errs = append(errs, entry.Err) 692 } 693 if foundErr { 694 return errs, nil 695 } 696 return nil, nil 697} 698 699// getApplyBulkRetries returns the entries that need to be retried 700func (t *Table) getApplyBulkRetries(entries []*entryErr) []*entryErr { 701 var retryEntries []*entryErr 702 for _, entry := range entries { 703 err := entry.Err 704 if err != nil && isIdempotentRetryCode[grpc.Code(err)] && mutationsAreRetryable(entry.Entry.Mutations) { 705 // There was an error and the entry is retryable. 706 retryEntries = append(retryEntries, entry) 707 } 708 } 709 return retryEntries 710} 711 712// doApplyBulk does the work of a single ApplyBulk invocation 713func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, opts ...ApplyOption) error { 714 after := func(res proto.Message) { 715 for _, o := range opts { 716 o.after(res) 717 } 718 } 719 720 entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs)) 721 for i, entryErr := range entryErrs { 722 entries[i] = entryErr.Entry 723 } 724 req := &btpb.MutateRowsRequest{ 725 TableName: t.c.fullTableName(t.table), 726 AppProfileId: t.c.appProfile, 727 Entries: entries, 728 } 729 stream, err := t.c.client.MutateRows(ctx, req) 730 if err != nil { 731 return err 732 } 733 for { 734 res, err := stream.Recv() 735 if err == io.EOF { 736 break 737 } 738 if err != nil { 739 return err 740 } 741 742 for i, entry := range res.Entries { 743 s := entry.Status 744 if s.Code == int32(codes.OK) { 745 entryErrs[i].Err = nil 746 } else { 747 entryErrs[i].Err = status.Errorf(codes.Code(s.Code), s.Message) 748 } 749 } 750 after(res) 751 } 752 return nil 753} 754 755// groupEntries groups entries into groups of a specified size without breaking up 756// individual entries. 757func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr { 758 var ( 759 res [][]*entryErr 760 start int 761 gmuts int 762 ) 763 addGroup := func(end int) { 764 if end-start > 0 { 765 res = append(res, entries[start:end]) 766 start = end 767 gmuts = 0 768 } 769 } 770 for i, e := range entries { 771 emuts := len(e.Entry.Mutations) 772 if gmuts+emuts > maxSize { 773 addGroup(i) 774 } 775 gmuts += emuts 776 } 777 addGroup(len(entries)) 778 return res 779} 780 781// Timestamp is in units of microseconds since 1 January 1970. 782type Timestamp int64 783 784// ServerTime is a specific Timestamp that may be passed to (*Mutation).Set. 785// It indicates that the server's timestamp should be used. 786const ServerTime Timestamp = -1 787 788// Time converts a time.Time into a Timestamp. 789func Time(t time.Time) Timestamp { return Timestamp(t.UnixNano() / 1e3) } 790 791// Now returns the Timestamp representation of the current time on the client. 792func Now() Timestamp { return Time(time.Now()) } 793 794// Time converts a Timestamp into a time.Time. 795func (ts Timestamp) Time() time.Time { return time.Unix(0, int64(ts)*1e3) } 796 797// TruncateToMilliseconds truncates a Timestamp to millisecond granularity, 798// which is currently the only granularity supported. 799func (ts Timestamp) TruncateToMilliseconds() Timestamp { 800 if ts == ServerTime { 801 return ts 802 } 803 return ts - ts%1000 804} 805 806// ApplyReadModifyWrite applies a ReadModifyWrite to a specific row. 807// It returns the newly written cells. 808func (t *Table) ApplyReadModifyWrite(ctx context.Context, row string, m *ReadModifyWrite) (Row, error) { 809 ctx = mergeOutgoingMetadata(ctx, t.md) 810 req := &btpb.ReadModifyWriteRowRequest{ 811 TableName: t.c.fullTableName(t.table), 812 AppProfileId: t.c.appProfile, 813 RowKey: []byte(row), 814 Rules: m.ops, 815 } 816 res, err := t.c.client.ReadModifyWriteRow(ctx, req) 817 if err != nil { 818 return nil, err 819 } 820 if res.Row == nil { 821 return nil, errors.New("unable to apply ReadModifyWrite: res.Row=nil") 822 } 823 r := make(Row) 824 for _, fam := range res.Row.Families { // res is *btpb.Row, fam is *btpb.Family 825 decodeFamilyProto(r, row, fam) 826 } 827 return r, nil 828} 829 830// ReadModifyWrite represents a set of operations on a single row of a table. 831// It is like Mutation but for non-idempotent changes. 832// When applied, these operations operate on the latest values of the row's cells, 833// and result in a new value being written to the relevant cell with a timestamp 834// that is max(existing timestamp, current server time). 835// 836// The application of a ReadModifyWrite is atomic; concurrent ReadModifyWrites will 837// be executed serially by the server. 838type ReadModifyWrite struct { 839 ops []*btpb.ReadModifyWriteRule 840} 841 842// NewReadModifyWrite returns a new ReadModifyWrite. 843func NewReadModifyWrite() *ReadModifyWrite { return new(ReadModifyWrite) } 844 845// AppendValue appends a value to a specific cell's value. 846// If the cell is unset, it will be treated as an empty value. 847func (m *ReadModifyWrite) AppendValue(family, column string, v []byte) { 848 m.ops = append(m.ops, &btpb.ReadModifyWriteRule{ 849 FamilyName: family, 850 ColumnQualifier: []byte(column), 851 Rule: &btpb.ReadModifyWriteRule_AppendValue{AppendValue: v}, 852 }) 853} 854 855// Increment interprets the value in a specific cell as a 64-bit big-endian signed integer, 856// and adds a value to it. If the cell is unset, it will be treated as zero. 857// If the cell is set and is not an 8-byte value, the entire ApplyReadModifyWrite 858// operation will fail. 859func (m *ReadModifyWrite) Increment(family, column string, delta int64) { 860 m.ops = append(m.ops, &btpb.ReadModifyWriteRule{ 861 FamilyName: family, 862 ColumnQualifier: []byte(column), 863 Rule: &btpb.ReadModifyWriteRule_IncrementAmount{IncrementAmount: delta}, 864 }) 865} 866 867// mergeOutgoingMetadata returns a context populated by the existing outgoing metadata, 868// if any, joined with internal metadata. 869func mergeOutgoingMetadata(ctx context.Context, md metadata.MD) context.Context { 870 mdCopy, _ := metadata.FromOutgoingContext(ctx) 871 return metadata.NewOutgoingContext(ctx, metadata.Join(mdCopy, md)) 872} 873 874// SampleRowKeys returns a sample of row keys in the table. The returned row keys will delimit contiguous sections of 875// the table of approximately equal size, which can be used to break up the data for distributed tasks like mapreduces. 876func (t *Table) SampleRowKeys(ctx context.Context) ([]string, error) { 877 ctx = mergeOutgoingMetadata(ctx, t.md) 878 var sampledRowKeys []string 879 err := gax.Invoke(ctx, func(ctx context.Context) error { 880 sampledRowKeys = nil 881 req := &btpb.SampleRowKeysRequest{ 882 TableName: t.c.fullTableName(t.table), 883 AppProfileId: t.c.appProfile, 884 } 885 ctx, cancel := context.WithCancel(ctx) // for aborting the stream 886 defer cancel() 887 888 stream, err := t.c.client.SampleRowKeys(ctx, req) 889 if err != nil { 890 return err 891 } 892 for { 893 res, err := stream.Recv() 894 if err == io.EOF { 895 break 896 } 897 if err != nil { 898 return err 899 } 900 901 key := string(res.RowKey) 902 if key == "" { 903 continue 904 } 905 906 sampledRowKeys = append(sampledRowKeys, key) 907 } 908 return nil 909 }, retryOptions...) 910 return sampledRowKeys, err 911} 912