1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "cloud.google.com/go/internal/trace" 26 vkit "cloud.google.com/go/spanner/apiv1" 27 "github.com/golang/protobuf/proto" 28 "google.golang.org/api/iterator" 29 sppb "google.golang.org/genproto/googleapis/spanner/v1" 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/status" 32) 33 34// transactionID stores a transaction ID which uniquely identifies a transaction 35// in Cloud Spanner. 36type transactionID []byte 37 38// txReadEnv manages a read-transaction environment consisting of a session 39// handle and a transaction selector. 40type txReadEnv interface { 41 // acquire returns a read-transaction environment that can be used to 42 // perform a transactional read. 43 acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) 44 // sets the transaction's read timestamp 45 setTimestamp(time.Time) 46 // release should be called at the end of every transactional read to deal 47 // with session recycling. 48 release(error) 49} 50 51// txReadOnly contains methods for doing transactional reads. 52type txReadOnly struct { 53 // read-transaction environment for performing transactional read 54 // operations. 55 txReadEnv 56 57 // Atomic. Only needed for DML statements, but used forall. 58 sequenceNumber int64 59 60 // replaceSessionFunc is a function that can be called to replace the 61 // session that is used by the transaction. This function should only be 62 // defined for single-use transactions that can safely be retried on a 63 // different session. All other transactions will set this function to nil. 64 replaceSessionFunc func(ctx context.Context) error 65 66 // sp is the session pool for allocating a session to execute the read-only 67 // transaction. It is set only once during initialization of the 68 // txReadOnly. 69 sp *sessionPool 70 // sh is the sessionHandle allocated from sp. 71 sh *sessionHandle 72 73 // qo provides options for executing a sql query. 74 qo QueryOptions 75 76 // txOpts provides options for a transaction. 77 txOpts TransactionOptions 78} 79 80// TransactionOptions provides options for a transaction. 81type TransactionOptions struct { 82 CommitOptions CommitOptions 83 84 // The transaction tag to use for a read/write transaction. 85 // This tag is automatically included with each statement and the commit 86 // request of a read/write transaction. 87 TransactionTag string 88 89 // CommitPriority is the priority to use for the Commit RPC for the 90 // transaction. 91 CommitPriority sppb.RequestOptions_Priority 92} 93 94func (to *TransactionOptions) requestPriority() sppb.RequestOptions_Priority { 95 return to.CommitPriority 96} 97 98func (to *TransactionOptions) requestTag() string { 99 return "" 100} 101 102// errSessionClosed returns error for using a recycled/destroyed session 103func errSessionClosed(sh *sessionHandle) error { 104 return spannerErrorf(codes.FailedPrecondition, 105 "session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient()) 106} 107 108// Read returns a RowIterator for reading multiple rows from the database. 109func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator { 110 return t.ReadWithOptions(ctx, table, keys, columns, nil) 111} 112 113// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}. 114func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) { 115 return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index}) 116} 117 118// ReadOptions provides options for reading rows from a database. 119type ReadOptions struct { 120 // The index to use for reading. If non-empty, you can only read columns 121 // that are part of the index key, part of the primary key, or stored in the 122 // index due to a STORING clause in the index definition. 123 Index string 124 125 // The maximum number of rows to read. A limit value less than 1 means no 126 // limit. 127 Limit int 128 129 // Priority is the RPC priority to use for the operation. 130 Priority sppb.RequestOptions_Priority 131 132 // The request tag to use for this request. 133 RequestTag string 134} 135 136// ReadWithOptions returns a RowIterator for reading multiple rows from the 137// database. Pass a ReadOptions to modify the read operation. 138func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) { 139 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read") 140 defer func() { trace.EndSpan(ctx, ri.err) }() 141 var ( 142 sh *sessionHandle 143 ts *sppb.TransactionSelector 144 err error 145 ) 146 kset, err := keys.keySetProto() 147 if err != nil { 148 return &RowIterator{err: err} 149 } 150 if sh, ts, err = t.acquire(ctx); err != nil { 151 return &RowIterator{err: err} 152 } 153 // Cloud Spanner will return "Session not found" on bad sessions. 154 client := sh.getClient() 155 if client == nil { 156 // Might happen if transaction is closed in the middle of a API call. 157 return &RowIterator{err: errSessionClosed(sh)} 158 } 159 index := "" 160 limit := 0 161 prio := sppb.RequestOptions_PRIORITY_UNSPECIFIED 162 requestTag := "" 163 if opts != nil { 164 index = opts.Index 165 if opts.Limit > 0 { 166 limit = opts.Limit 167 } 168 prio = opts.Priority 169 requestTag = opts.RequestTag 170 } 171 return streamWithReplaceSessionFunc( 172 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 173 sh.session.logger, 174 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 175 return client.StreamingRead(ctx, 176 &sppb.ReadRequest{ 177 Session: t.sh.getID(), 178 Transaction: ts, 179 Table: table, 180 Index: index, 181 Columns: columns, 182 KeySet: kset, 183 ResumeToken: resumeToken, 184 Limit: int64(limit), 185 RequestOptions: createRequestOptions(prio, requestTag, t.txOpts.TransactionTag), 186 }) 187 }, 188 t.replaceSessionFunc, 189 t.setTimestamp, 190 t.release, 191 ) 192} 193 194// errRowNotFound returns error for not being able to read the row identified by 195// key. 196func errRowNotFound(table string, key Key) error { 197 return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key) 198} 199 200// errRowNotFoundByIndex returns error for not being able to read the row by index. 201func errRowNotFoundByIndex(table string, key Key, index string) error { 202 return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 203} 204 205// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index. 206func errMultipleRowsFound(table string, key Key, index string) error { 207 return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 208} 209 210// ReadRow reads a single row from the database. 211// 212// If no row is present with the given key, then ReadRow returns an error where 213// spanner.ErrCode(err) is codes.NotFound. 214func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) { 215 iter := t.Read(ctx, table, key, columns) 216 defer iter.Stop() 217 row, err := iter.Next() 218 switch err { 219 case iterator.Done: 220 return nil, errRowNotFound(table, key) 221 case nil: 222 return row, nil 223 default: 224 return nil, err 225 } 226} 227 228// ReadRowUsingIndex reads a single row from the database using an index. 229// 230// If no row is present with the given index, then ReadRowUsingIndex returns an 231// error where spanner.ErrCode(err) is codes.NotFound. 232// 233// If more than one row received with the given index, then ReadRowUsingIndex 234// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition. 235func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) { 236 iter := t.ReadUsingIndex(ctx, table, index, key, columns) 237 defer iter.Stop() 238 row, err := iter.Next() 239 switch err { 240 case iterator.Done: 241 return nil, errRowNotFoundByIndex(table, key, index) 242 case nil: 243 // If more than one row found, return an error. 244 _, err := iter.Next() 245 switch err { 246 case iterator.Done: 247 return row, nil 248 case nil: 249 return nil, errMultipleRowsFound(table, key, index) 250 default: 251 return nil, err 252 } 253 default: 254 return nil, err 255 } 256} 257 258// QueryOptions provides options for executing a sql query or update statement. 259type QueryOptions struct { 260 Mode *sppb.ExecuteSqlRequest_QueryMode 261 Options *sppb.ExecuteSqlRequest_QueryOptions 262 263 // Priority is the RPC priority to use for the query/update. 264 Priority sppb.RequestOptions_Priority 265 266 // The request tag to use for this request. 267 RequestTag string 268} 269 270// merge combines two QueryOptions that the input parameter will have higher 271// order of precedence. 272func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { 273 merged := QueryOptions{ 274 Mode: qo.Mode, 275 Options: &sppb.ExecuteSqlRequest_QueryOptions{}, 276 RequestTag: qo.RequestTag, 277 Priority: qo.Priority, 278 } 279 if opts.Mode != nil { 280 merged.Mode = opts.Mode 281 } 282 if opts.RequestTag != "" { 283 merged.RequestTag = opts.RequestTag 284 } 285 if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { 286 merged.Priority = opts.Priority 287 } 288 proto.Merge(merged.Options, qo.Options) 289 proto.Merge(merged.Options, opts.Options) 290 return merged 291} 292 293func createRequestOptions(prio sppb.RequestOptions_Priority, requestTag, transactionTag string) (ro *sppb.RequestOptions) { 294 ro = &sppb.RequestOptions{} 295 if prio != sppb.RequestOptions_PRIORITY_UNSPECIFIED { 296 ro.Priority = prio 297 } 298 if requestTag != "" { 299 ro.RequestTag = requestTag 300 } 301 if transactionTag != "" { 302 ro.TransactionTag = transactionTag 303 } 304 return ro 305} 306 307// Query executes a query against the database. It returns a RowIterator for 308// retrieving the resulting rows. 309// 310// Query returns only row data, without a query plan or execution statistics. 311// Use QueryWithStats to get rows along with the plan and statistics. Use 312// AnalyzeQuery to get just the plan. 313func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator { 314 mode := sppb.ExecuteSqlRequest_NORMAL 315 return t.query(ctx, statement, QueryOptions{ 316 Mode: &mode, 317 Options: t.qo.Options, 318 }) 319} 320 321// QueryWithOptions executes a SQL statment against the database. It returns 322// a RowIterator for retrieving the resulting rows. The sql query execution 323// will be optimized based on the given query options. 324func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, opts QueryOptions) *RowIterator { 325 return t.query(ctx, statement, t.qo.merge(opts)) 326} 327 328// QueryWithStats executes a SQL statement against the database. It returns 329// a RowIterator for retrieving the resulting rows. The RowIterator will also 330// be populated with a query plan and execution statistics. 331func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator { 332 mode := sppb.ExecuteSqlRequest_PROFILE 333 return t.query(ctx, statement, QueryOptions{ 334 Mode: &mode, 335 Options: t.qo.Options, 336 }) 337} 338 339// AnalyzeQuery returns the query plan for statement. 340func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) { 341 mode := sppb.ExecuteSqlRequest_PLAN 342 iter := t.query(ctx, statement, QueryOptions{ 343 Mode: &mode, 344 Options: t.qo.Options, 345 }) 346 defer iter.Stop() 347 for { 348 _, err := iter.Next() 349 if err == iterator.Done { 350 break 351 } 352 if err != nil { 353 return nil, err 354 } 355 } 356 if iter.QueryPlan == nil { 357 return nil, spannerErrorf(codes.Internal, "query plan unavailable") 358 } 359 return iter.QueryPlan, nil 360} 361 362func (t *txReadOnly) query(ctx context.Context, statement Statement, options QueryOptions) (ri *RowIterator) { 363 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query") 364 defer func() { trace.EndSpan(ctx, ri.err) }() 365 req, sh, err := t.prepareExecuteSQL(ctx, statement, options) 366 if err != nil { 367 return &RowIterator{err: err} 368 } 369 client := sh.getClient() 370 return streamWithReplaceSessionFunc( 371 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 372 sh.session.logger, 373 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 374 req.ResumeToken = resumeToken 375 req.Session = t.sh.getID() 376 return client.ExecuteStreamingSql(ctx, req) 377 }, 378 t.replaceSessionFunc, 379 t.setTimestamp, 380 t.release) 381} 382 383func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, options QueryOptions) (*sppb.ExecuteSqlRequest, *sessionHandle, error) { 384 sh, ts, err := t.acquire(ctx) 385 if err != nil { 386 return nil, nil, err 387 } 388 // Cloud Spanner will return "Session not found" on bad sessions. 389 sid := sh.getID() 390 if sid == "" { 391 // Might happen if transaction is closed in the middle of a API call. 392 return nil, nil, errSessionClosed(sh) 393 } 394 params, paramTypes, err := stmt.convertParams() 395 if err != nil { 396 return nil, nil, err 397 } 398 mode := sppb.ExecuteSqlRequest_NORMAL 399 if options.Mode != nil { 400 mode = *options.Mode 401 } 402 req := &sppb.ExecuteSqlRequest{ 403 Session: sid, 404 Transaction: ts, 405 Sql: stmt.SQL, 406 QueryMode: mode, 407 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 408 Params: params, 409 ParamTypes: paramTypes, 410 QueryOptions: options.Options, 411 RequestOptions: createRequestOptions(options.Priority, options.RequestTag, t.txOpts.TransactionTag), 412 } 413 return req, sh, nil 414} 415 416// txState is the status of a transaction. 417type txState int 418 419const ( 420 // transaction is new, waiting to be initialized.. 421 txNew txState = iota 422 // transaction is being initialized. 423 txInit 424 // transaction is active and can perform read/write. 425 txActive 426 // transaction is closed, cannot be used anymore. 427 txClosed 428) 429 430// errRtsUnavailable returns error for read transaction's read timestamp being 431// unavailable. 432func errRtsUnavailable() error { 433 return spannerErrorf(codes.Internal, "read timestamp is unavailable") 434} 435 436// errTxClosed returns error for using a closed transaction. 437func errTxClosed() error { 438 return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction") 439} 440 441// errUnexpectedTxState returns error for transaction enters an unexpected state. 442func errUnexpectedTxState(ts txState) error { 443 return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts) 444} 445 446// ReadOnlyTransaction provides a snapshot transaction with guaranteed 447// consistency across reads, but does not allow writes. Read-only transactions 448// can be configured to read at timestamps in the past. 449// 450// Read-only transactions do not take locks. Instead, they work by choosing a 451// Cloud Spanner timestamp, then executing all reads at that timestamp. Since 452// they do not acquire locks, they do not block concurrent read-write 453// transactions. 454// 455// Unlike locking read-write transactions, read-only transactions never abort. 456// They can fail if the chosen read timestamp is garbage collected; however, the 457// default garbage collection policy is generous enough that most applications 458// do not need to worry about this in practice. See the documentation of 459// TimestampBound for more details. 460// 461// A ReadOnlyTransaction consumes resources on the server until Close is called. 462type ReadOnlyTransaction struct { 463 // mu protects concurrent access to the internal states of ReadOnlyTransaction. 464 mu sync.Mutex 465 // txReadOnly contains methods for performing transactional reads. 466 txReadOnly 467 // singleUse indicates that the transaction can be used for only one read. 468 singleUse bool 469 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 470 // ReadOnlyTransaction. 471 tx transactionID 472 // txReadyOrClosed is for broadcasting that transaction ID has been returned 473 // by Cloud Spanner or that transaction is closed. 474 txReadyOrClosed chan struct{} 475 // state is the current transaction status of the ReadOnly transaction. 476 state txState 477 // rts is the read timestamp returned by transactional reads. 478 rts time.Time 479 // tb is the read staleness bound specification for transactional reads. 480 tb TimestampBound 481} 482 483// errTxInitTimeout returns error for timeout in waiting for initialization of 484// the transaction. 485func errTxInitTimeout() error { 486 return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization") 487} 488 489// getTimestampBound returns the read staleness bound specified for the 490// ReadOnlyTransaction. 491func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound { 492 t.mu.Lock() 493 defer t.mu.Unlock() 494 return t.tb 495} 496 497// begin starts a snapshot read-only Transaction on Cloud Spanner. 498func (t *ReadOnlyTransaction) begin(ctx context.Context) error { 499 var ( 500 locked bool 501 tx transactionID 502 rts time.Time 503 sh *sessionHandle 504 err error 505 res *sppb.Transaction 506 ) 507 defer func() { 508 if !locked { 509 t.mu.Lock() 510 // Not necessary, just to make it clear that t.mu is being held when 511 // locked == true. 512 locked = true 513 } 514 if t.state != txClosed { 515 // Signal other initialization routines. 516 close(t.txReadyOrClosed) 517 t.txReadyOrClosed = make(chan struct{}) 518 } 519 t.mu.Unlock() 520 if err != nil && sh != nil { 521 // Got a valid session handle, but failed to initialize transaction= 522 // on Cloud Spanner. 523 if isSessionNotFoundError(err) { 524 sh.destroy() 525 } 526 // If sh.destroy was already executed, this becomes a noop. 527 sh.recycle() 528 } 529 }() 530 // Retry the BeginTransaction call if a 'Session not found' is returned. 531 for { 532 sh, err = t.sp.take(ctx) 533 if err != nil { 534 return err 535 } 536 res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 537 Session: sh.getID(), 538 Options: &sppb.TransactionOptions{ 539 Mode: &sppb.TransactionOptions_ReadOnly_{ 540 ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true), 541 }, 542 }, 543 }) 544 if isSessionNotFoundError(err) { 545 sh.destroy() 546 continue 547 } else if err == nil { 548 tx = res.Id 549 if res.ReadTimestamp != nil { 550 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 551 } 552 } else { 553 err = ToSpannerError(err) 554 } 555 break 556 } 557 t.mu.Lock() 558 559 // defer function will be executed with t.mu being held. 560 locked = true 561 562 // During the execution of t.begin(), t.Close() was invoked. 563 if t.state == txClosed { 564 return errSessionClosed(sh) 565 } 566 567 // If begin() fails, this allows other queries to take over the 568 // initialization. 569 t.tx = nil 570 if err == nil { 571 t.tx = tx 572 t.rts = rts 573 t.sh = sh 574 // State transite to txActive. 575 t.state = txActive 576 } 577 return err 578} 579 580// acquire implements txReadEnv.acquire. 581func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 582 if err := checkNestedTxn(ctx); err != nil { 583 return nil, nil, err 584 } 585 if t.singleUse { 586 return t.acquireSingleUse(ctx) 587 } 588 return t.acquireMultiUse(ctx) 589} 590 591func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 592 t.mu.Lock() 593 defer t.mu.Unlock() 594 switch t.state { 595 case txClosed: 596 // A closed single-use transaction can never be reused. 597 return nil, nil, errTxClosed() 598 case txNew: 599 t.state = txClosed 600 ts := &sppb.TransactionSelector{ 601 Selector: &sppb.TransactionSelector_SingleUse{ 602 SingleUse: &sppb.TransactionOptions{ 603 Mode: &sppb.TransactionOptions_ReadOnly_{ 604 ReadOnly: buildTransactionOptionsReadOnly(t.tb, true), 605 }, 606 }, 607 }, 608 } 609 sh, err := t.sp.take(ctx) 610 if err != nil { 611 return nil, nil, err 612 } 613 614 // Install session handle into t, which can be used for readonly 615 // operations later. 616 t.sh = sh 617 return sh, ts, nil 618 } 619 us := t.state 620 621 // SingleUse transaction should only be in either txNew state or txClosed 622 // state. 623 return nil, nil, errUnexpectedTxState(us) 624} 625 626func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 627 for { 628 t.mu.Lock() 629 switch t.state { 630 case txClosed: 631 t.mu.Unlock() 632 return nil, nil, errTxClosed() 633 case txNew: 634 // State transit to txInit so that no further TimestampBound change 635 // is accepted. 636 t.state = txInit 637 t.mu.Unlock() 638 continue 639 case txInit: 640 if t.tx != nil { 641 // Wait for a transaction ID to become ready. 642 txReadyOrClosed := t.txReadyOrClosed 643 t.mu.Unlock() 644 select { 645 case <-txReadyOrClosed: 646 // Need to check transaction state again. 647 continue 648 case <-ctx.Done(): 649 // The waiting for initialization is timeout, return error 650 // directly. 651 return nil, nil, errTxInitTimeout() 652 } 653 } 654 // Take the ownership of initializing the transaction. 655 t.tx = transactionID{} 656 t.mu.Unlock() 657 // Begin a read-only transaction. 658 // 659 // TODO: consider adding a transaction option which allow queries to 660 // initiate transactions by themselves. Note that this option might 661 // not be always good because the ID of the new transaction won't 662 // be ready till the query returns some data or completes. 663 if err := t.begin(ctx); err != nil { 664 return nil, nil, err 665 } 666 667 // If t.begin() succeeded, t.state should have been changed to 668 // txActive, so we can just continue here. 669 continue 670 case txActive: 671 sh := t.sh 672 ts := &sppb.TransactionSelector{ 673 Selector: &sppb.TransactionSelector_Id{ 674 Id: t.tx, 675 }, 676 } 677 t.mu.Unlock() 678 return sh, ts, nil 679 } 680 state := t.state 681 t.mu.Unlock() 682 return nil, nil, errUnexpectedTxState(state) 683 } 684} 685 686func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) { 687 t.mu.Lock() 688 defer t.mu.Unlock() 689 if t.rts.IsZero() { 690 t.rts = ts 691 } 692} 693 694// release implements txReadEnv.release. 695func (t *ReadOnlyTransaction) release(err error) { 696 t.mu.Lock() 697 sh := t.sh 698 t.mu.Unlock() 699 if sh != nil { // sh could be nil if t.acquire() fails. 700 if isSessionNotFoundError(err) { 701 sh.destroy() 702 } 703 if t.singleUse { 704 // If session handle is already destroyed, this becomes a noop. 705 sh.recycle() 706 } 707 } 708} 709 710// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads 711// after being closed. 712func (t *ReadOnlyTransaction) Close() { 713 if t.singleUse { 714 return 715 } 716 t.mu.Lock() 717 if t.state != txClosed { 718 t.state = txClosed 719 close(t.txReadyOrClosed) 720 } 721 sh := t.sh 722 t.mu.Unlock() 723 if sh == nil { 724 return 725 } 726 // If session handle is already destroyed, this becomes a noop. If there are 727 // still active queries and if the recycled session is reused before they 728 // complete, Cloud Spanner will cancel them on behalf of the new transaction 729 // on the session. 730 if sh != nil { 731 sh.recycle() 732 } 733} 734 735// Timestamp returns the timestamp chosen to perform reads and queries in this 736// transaction. The value can only be read after some read or query has either 737// returned some data or completed without returning any data. 738func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) { 739 t.mu.Lock() 740 defer t.mu.Unlock() 741 if t.rts.IsZero() { 742 return t.rts, errRtsUnavailable() 743 } 744 return t.rts, nil 745} 746 747// WithTimestampBound specifies the TimestampBound to use for read or query. 748// This can only be used before the first read or query is invoked. Note: 749// bounded staleness is not available with general ReadOnlyTransactions; use a 750// single-use ReadOnlyTransaction instead. 751// 752// The returned value is the ReadOnlyTransaction so calls can be chained. 753func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction { 754 t.mu.Lock() 755 defer t.mu.Unlock() 756 if t.state == txNew { 757 // Only allow to set TimestampBound before the first query. 758 t.tb = tb 759 } 760 return t 761} 762 763// ReadWriteTransaction provides a locking read-write transaction. 764// 765// This type of transaction is the only way to write data into Cloud Spanner; 766// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use 767// transactions internally. These transactions rely on pessimistic locking and, 768// if necessary, two-phase commit. Locking read-write transactions may abort, 769// requiring the application to retry. However, the interface exposed by 770// (*Client).ReadWriteTransaction eliminates the need for applications to write 771// retry loops explicitly. 772// 773// Locking transactions may be used to atomically read-modify-write data 774// anywhere in a database. This type of transaction is externally consistent. 775// 776// Clients should attempt to minimize the amount of time a transaction is 777// active. Faster transactions commit with higher probability and cause less 778// contention. Cloud Spanner attempts to keep read locks active as long as the 779// transaction continues to do reads. Long periods of inactivity at the client 780// may cause Cloud Spanner to release a transaction's locks and abort it. 781// 782// Reads performed within a transaction acquire locks on the data being 783// read. Writes can only be done at commit time, after all reads have been 784// completed. Conceptually, a read-write transaction consists of zero or more 785// reads or SQL queries followed by a commit. 786// 787// See (*Client).ReadWriteTransaction for an example. 788// 789// Semantics 790// 791// Cloud Spanner can commit the transaction if all read locks it acquired are 792// still valid at commit time, and it is able to acquire write locks for all 793// writes. Cloud Spanner can abort the transaction for any reason. If a commit 794// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has 795// not modified any user data in Cloud Spanner. 796// 797// Unless the transaction commits, Cloud Spanner makes no guarantees about how 798// long the transaction's locks were held for. It is an error to use Cloud 799// Spanner locks for any sort of mutual exclusion other than between Cloud 800// Spanner transactions themselves. 801// 802// Aborted transactions 803// 804// Application code does not need to retry explicitly; RunInTransaction will 805// automatically retry a transaction if an attempt results in an abort. The lock 806// priority of a transaction increases after each prior aborted transaction, 807// meaning that the next attempt has a slightly better chance of success than 808// before. 809// 810// Under some circumstances (e.g., many transactions attempting to modify the 811// same row(s)), a transaction can abort many times in a short period before 812// successfully committing. Thus, it is not a good idea to cap the number of 813// retries a transaction can attempt; instead, it is better to limit the total 814// amount of wall time spent retrying. 815// 816// Idle transactions 817// 818// A transaction is considered idle if it has no outstanding reads or SQL 819// queries and has not started a read or SQL query within the last 10 820// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't 821// hold on to locks indefinitely. In that case, the commit will fail with error 822// ABORTED. 823// 824// If this behavior is undesirable, periodically executing a simple SQL query 825// in the transaction (e.g., SELECT 1) prevents the transaction from becoming 826// idle. 827type ReadWriteTransaction struct { 828 // txReadOnly contains methods for performing transactional reads. 829 txReadOnly 830 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 831 // ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin() 832 // during the initialization of ReadWriteTransaction. 833 tx transactionID 834 // mu protects concurrent access to the internal states of 835 // ReadWriteTransaction. 836 mu sync.Mutex 837 // state is the current transaction status of the read-write transaction. 838 state txState 839 // wb is the set of buffered mutations waiting to be committed. 840 wb []*Mutation 841} 842 843// BufferWrite adds a list of mutations to the set of updates that will be 844// applied when the transaction is committed. It does not actually apply the 845// write until the transaction is committed, so the operation does not block. 846// The effects of the write won't be visible to any reads (including reads done 847// in the same transaction) until the transaction commits. 848// 849// See the example for Client.ReadWriteTransaction. 850func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error { 851 t.mu.Lock() 852 defer t.mu.Unlock() 853 if t.state == txClosed { 854 return errTxClosed() 855 } 856 if t.state != txActive { 857 return errUnexpectedTxState(t.state) 858 } 859 t.wb = append(t.wb, ms...) 860 return nil 861} 862 863// Update executes a DML statement against the database. It returns the number 864// of affected rows. Update returns an error if the statement is a query. 865// However, the query is executed, and any data read will be validated upon 866// commit. 867func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) { 868 mode := sppb.ExecuteSqlRequest_NORMAL 869 return t.update(ctx, stmt, QueryOptions{ 870 Mode: &mode, 871 Options: t.qo.Options, 872 }) 873} 874 875// UpdateWithOptions executes a DML statement against the database. It returns 876// the number of affected rows. The given QueryOptions will be used for the 877// execution of this statement. 878func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { 879 return t.update(ctx, stmt, t.qo.merge(opts)) 880} 881 882func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { 883 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update") 884 defer func() { trace.EndSpan(ctx, err) }() 885 req, sh, err := t.prepareExecuteSQL(ctx, stmt, opts) 886 if err != nil { 887 return 0, err 888 } 889 resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req) 890 if err != nil { 891 return 0, ToSpannerError(err) 892 } 893 if resultSet.Stats == nil { 894 return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL) 895 } 896 return extractRowCount(resultSet.Stats) 897} 898 899// BatchUpdate groups one or more DML statements and sends them to Spanner in a 900// single RPC. This is an efficient way to execute multiple DML statements. 901// 902// A slice of counts is returned, where each count represents the number of 903// affected rows for the given query at the same index. If an error occurs, 904// counts will be returned up to the query that encountered the error. 905func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) { 906 return t.BatchUpdateWithOptions(ctx, stmts, QueryOptions{}) 907} 908 909// BatchUpdateWithOptions groups one or more DML statements and sends them to 910// Spanner in a single RPC. This is an efficient way to execute multiple DML 911// statements. 912// 913// A slice of counts is returned, where each count represents the number of 914// affected rows for the given query at the same index. If an error occurs, 915// counts will be returned up to the query that encountered the error. 916// 917// The request tag and priority given in the QueryOptions are included with the 918// RPC. Any other options that are set in the QueryOptions struct are ignored. 919func (t *ReadWriteTransaction) BatchUpdateWithOptions(ctx context.Context, stmts []Statement, opts QueryOptions) (_ []int64, err error) { 920 return t.batchUpdateWithOptions(ctx, stmts, t.qo.merge(opts)) 921} 922 923func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts []Statement, opts QueryOptions) (_ []int64, err error) { 924 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate") 925 defer func() { trace.EndSpan(ctx, err) }() 926 927 sh, ts, err := t.acquire(ctx) 928 if err != nil { 929 return nil, err 930 } 931 // Cloud Spanner will return "Session not found" on bad sessions. 932 sid := sh.getID() 933 if sid == "" { 934 // Might happen if transaction is closed in the middle of a API call. 935 return nil, errSessionClosed(sh) 936 } 937 938 var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement 939 for _, st := range stmts { 940 params, paramTypes, err := st.convertParams() 941 if err != nil { 942 return nil, err 943 } 944 sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{ 945 Sql: st.SQL, 946 Params: params, 947 ParamTypes: paramTypes, 948 }) 949 } 950 951 resp, err := sh.getClient().ExecuteBatchDml(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.ExecuteBatchDmlRequest{ 952 Session: sh.getID(), 953 Transaction: ts, 954 Statements: sppbStmts, 955 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 956 RequestOptions: createRequestOptions(opts.Priority, opts.RequestTag, t.txOpts.TransactionTag), 957 }) 958 if err != nil { 959 return nil, ToSpannerError(err) 960 } 961 962 var counts []int64 963 for _, rs := range resp.ResultSets { 964 count, err := extractRowCount(rs.Stats) 965 if err != nil { 966 return nil, err 967 } 968 counts = append(counts, count) 969 } 970 if resp.Status != nil && resp.Status.Code != 0 { 971 return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message) 972 } 973 return counts, nil 974} 975 976// acquire implements txReadEnv.acquire. 977func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 978 ts := &sppb.TransactionSelector{ 979 Selector: &sppb.TransactionSelector_Id{ 980 Id: t.tx, 981 }, 982 } 983 t.mu.Lock() 984 defer t.mu.Unlock() 985 switch t.state { 986 case txClosed: 987 return nil, nil, errTxClosed() 988 case txActive: 989 return t.sh, ts, nil 990 } 991 return nil, nil, errUnexpectedTxState(t.state) 992} 993 994// release implements txReadEnv.release. 995func (t *ReadWriteTransaction) release(err error) { 996 t.mu.Lock() 997 sh := t.sh 998 t.mu.Unlock() 999 if sh != nil && isSessionNotFoundError(err) { 1000 sh.destroy() 1001 } 1002} 1003 1004func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) { 1005 res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{ 1006 Session: sid, 1007 Options: &sppb.TransactionOptions{ 1008 Mode: &sppb.TransactionOptions_ReadWrite_{ 1009 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 1010 }, 1011 }, 1012 }) 1013 if err != nil { 1014 return nil, err 1015 } 1016 if res.Id == nil { 1017 return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.") 1018 } 1019 return res.Id, nil 1020} 1021 1022// begin starts a read-write transacton on Cloud Spanner, it is always called 1023// before any of the public APIs. 1024func (t *ReadWriteTransaction) begin(ctx context.Context) error { 1025 if t.tx != nil { 1026 t.state = txActive 1027 return nil 1028 } 1029 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) 1030 if err == nil { 1031 t.tx = tx 1032 t.state = txActive 1033 return nil 1034 } 1035 if isSessionNotFoundError(err) { 1036 t.sh.destroy() 1037 } 1038 return err 1039} 1040 1041// CommitResponse provides a response of a transaction commit in a database. 1042type CommitResponse struct { 1043 // CommitTs is the commit time for a transaction. 1044 CommitTs time.Time 1045 // CommitStats is the commit statistics for a transaction. 1046 CommitStats *sppb.CommitResponse_CommitStats 1047} 1048 1049// CommitOptions provides options for commiting a transaction in a database. 1050type CommitOptions struct { 1051 ReturnCommitStats bool 1052} 1053 1054// commit tries to commit a readwrite transaction to Cloud Spanner. It also 1055// returns the commit response for the transactions. 1056func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions) (CommitResponse, error) { 1057 resp := CommitResponse{} 1058 t.mu.Lock() 1059 t.state = txClosed // No further operations after commit. 1060 mPb, err := mutationsProto(t.wb) 1061 t.mu.Unlock() 1062 if err != nil { 1063 return resp, err 1064 } 1065 1066 // In case that sessionHandle was destroyed but transaction body fails to 1067 // report it. 1068 sid, client := t.sh.getID(), t.sh.getClient() 1069 if sid == "" || client == nil { 1070 return resp, errSessionClosed(t.sh) 1071 } 1072 1073 res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{ 1074 Session: sid, 1075 Transaction: &sppb.CommitRequest_TransactionId{ 1076 TransactionId: t.tx, 1077 }, 1078 RequestOptions: createRequestOptions(t.txOpts.CommitPriority, "", t.txOpts.TransactionTag), 1079 Mutations: mPb, 1080 ReturnCommitStats: options.ReturnCommitStats, 1081 }) 1082 if e != nil { 1083 return resp, toSpannerErrorWithCommitInfo(e, true) 1084 } 1085 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 1086 resp.CommitTs = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 1087 } 1088 if options.ReturnCommitStats { 1089 resp.CommitStats = res.CommitStats 1090 } 1091 if isSessionNotFoundError(err) { 1092 t.sh.destroy() 1093 } 1094 return resp, err 1095} 1096 1097// rollback is called when a commit is aborted or the transaction body runs 1098// into error. 1099func (t *ReadWriteTransaction) rollback(ctx context.Context) { 1100 t.mu.Lock() 1101 // Forbid further operations on rollbacked transaction. 1102 t.state = txClosed 1103 t.mu.Unlock() 1104 // In case that sessionHandle was destroyed but transaction body fails to 1105 // report it. 1106 sid, client := t.sh.getID(), t.sh.getClient() 1107 if sid == "" || client == nil { 1108 return 1109 } 1110 err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{ 1111 Session: sid, 1112 TransactionId: t.tx, 1113 }) 1114 if isSessionNotFoundError(err) { 1115 t.sh.destroy() 1116 } 1117} 1118 1119// runInTransaction executes f under a read-write transaction context. 1120func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (CommitResponse, error) { 1121 var ( 1122 resp CommitResponse 1123 err error 1124 errDuringCommit bool 1125 ) 1126 if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil { 1127 // Try to commit if transaction body returns no error. 1128 resp, err = t.commit(ctx, t.txOpts.CommitOptions) 1129 errDuringCommit = err != nil 1130 } 1131 if err != nil { 1132 if isAbortedErr(err) { 1133 // Retry the transaction using the same session on ABORT error. 1134 // Cloud Spanner will create the new transaction with the previous 1135 // one's wound-wait priority. 1136 return resp, err 1137 } 1138 if isSessionNotFoundError(err) { 1139 t.sh.destroy() 1140 return resp, err 1141 } 1142 // Rollback the transaction unless the error occurred during the 1143 // commit. Executing a rollback after a commit has failed will 1144 // otherwise cause an error. Note that transient errors, such as 1145 // UNAVAILABLE, are already handled in the gRPC layer and do not show 1146 // up here. Context errors (deadline exceeded / canceled) during 1147 // commits are also not rolled back. 1148 if !errDuringCommit { 1149 t.rollback(ctx) 1150 } 1151 return resp, err 1152 } 1153 // err == nil, return commit response. 1154 return resp, nil 1155} 1156 1157// ReadWriteStmtBasedTransaction provides a wrapper of ReadWriteTransaction in 1158// order to run a read-write transaction in a statement-based way. 1159// 1160// This struct is returned by NewReadWriteStmtBasedTransaction and contains 1161// Commit() and Rollback() methods to end a transaction. 1162type ReadWriteStmtBasedTransaction struct { 1163 // ReadWriteTransaction contains methods for performing transactional reads. 1164 ReadWriteTransaction 1165 1166 options TransactionOptions 1167} 1168 1169// NewReadWriteStmtBasedTransaction starts a read-write transaction. Commit() or 1170// Rollback() must be called to end a transaction. If Commit() or Rollback() is 1171// not called, the session that is used by the transaction will not be returned 1172// to the pool and cause a session leak. 1173// 1174// This method should only be used when manual error handling and retry 1175// management is needed. Cloud Spanner may abort a read/write transaction at any 1176// moment, and each statement that is executed on the transaction should be 1177// checked for an Aborted error, including queries and read operations. 1178// 1179// For most use cases, client.ReadWriteTransaction should be used, as it will 1180// handle all Aborted and 'Session not found' errors automatically. 1181func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWriteStmtBasedTransaction, error) { 1182 return NewReadWriteStmtBasedTransactionWithOptions(ctx, c, TransactionOptions{}) 1183} 1184 1185// NewReadWriteStmtBasedTransactionWithOptions starts a read-write transaction 1186// with configurable options. Commit() or Rollback() must be called to end a 1187// transaction. If Commit() or Rollback() is not called, the session that is 1188// used by the transaction will not be returned to the pool and cause a session 1189// leak. 1190// 1191// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of 1192// NewReadWriteStmtBasedTransaction. 1193func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) { 1194 var ( 1195 sh *sessionHandle 1196 err error 1197 t *ReadWriteStmtBasedTransaction 1198 ) 1199 sh, err = c.idleSessions.takeWriteSession(ctx) 1200 if err != nil { 1201 // If session retrieval fails, just fail the transaction. 1202 return nil, err 1203 } 1204 t = &ReadWriteStmtBasedTransaction{ 1205 ReadWriteTransaction: ReadWriteTransaction{ 1206 tx: sh.getTransactionID(), 1207 }, 1208 } 1209 t.txReadOnly.sh = sh 1210 t.txReadOnly.txReadEnv = t 1211 t.txReadOnly.qo = c.qo 1212 t.txOpts = options 1213 1214 if err = t.begin(ctx); err != nil { 1215 if sh != nil { 1216 sh.recycle() 1217 } 1218 return nil, err 1219 } 1220 return t, err 1221} 1222 1223// Commit tries to commit a readwrite transaction to Cloud Spanner. It also 1224// returns the commit timestamp for the transactions. 1225func (t *ReadWriteStmtBasedTransaction) Commit(ctx context.Context) (time.Time, error) { 1226 resp, err := t.CommitWithReturnResp(ctx) 1227 return resp.CommitTs, err 1228} 1229 1230// CommitWithReturnResp tries to commit a readwrite transaction. It also returns 1231// the commit timestamp and stats for the transactions. 1232func (t *ReadWriteStmtBasedTransaction) CommitWithReturnResp(ctx context.Context) (CommitResponse, error) { 1233 resp, err := t.commit(ctx, t.txOpts.CommitOptions) 1234 // Rolling back an aborted transaction is not necessary. 1235 if err != nil && status.Code(err) != codes.Aborted { 1236 t.rollback(ctx) 1237 } 1238 if t.sh != nil { 1239 t.sh.recycle() 1240 } 1241 return resp, err 1242} 1243 1244// Rollback is called to cancel the ongoing transaction that has not been 1245// committed yet. 1246func (t *ReadWriteStmtBasedTransaction) Rollback(ctx context.Context) { 1247 t.rollback(ctx) 1248 if t.sh != nil { 1249 t.sh.recycle() 1250 } 1251} 1252 1253// writeOnlyTransaction provides the most efficient way of doing write-only 1254// transactions. It essentially does blind writes to Cloud Spanner. 1255type writeOnlyTransaction struct { 1256 // sp is the session pool which writeOnlyTransaction uses to get Cloud 1257 // Spanner sessions for blind writes. 1258 sp *sessionPool 1259 // transactionTag is the tag that will be included with the CommitRequest 1260 // of the write-only transaction. 1261 transactionTag string 1262 // commitPriority is the RPC priority to use for the commit operation. 1263 commitPriority sppb.RequestOptions_Priority 1264} 1265 1266// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, 1267// unless one of the following happens: 1268// 1269// 1) Context times out. 1270// 2) An unretryable error (e.g. database not found) occurs. 1271// 3) There is a malformed Mutation object. 1272func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) { 1273 var ( 1274 ts time.Time 1275 sh *sessionHandle 1276 ) 1277 mPb, err := mutationsProto(ms) 1278 if err != nil { 1279 // Malformed mutation found, just return the error. 1280 return ts, err 1281 } 1282 1283 // Retry-loop for aborted transactions. 1284 // TODO: Replace with generic retryer. 1285 for { 1286 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 1287 // No usable session for doing the commit, take one from pool. 1288 sh, err = t.sp.take(ctx) 1289 if err != nil { 1290 // sessionPool.Take already retries for session 1291 // creations/retrivals. 1292 return ts, err 1293 } 1294 defer sh.recycle() 1295 } 1296 res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ 1297 Session: sh.getID(), 1298 Transaction: &sppb.CommitRequest_SingleUseTransaction{ 1299 SingleUseTransaction: &sppb.TransactionOptions{ 1300 Mode: &sppb.TransactionOptions_ReadWrite_{ 1301 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 1302 }, 1303 }, 1304 }, 1305 Mutations: mPb, 1306 RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag), 1307 }) 1308 if err != nil && !isAbortedErr(err) { 1309 if isSessionNotFoundError(err) { 1310 // Discard the bad session. 1311 sh.destroy() 1312 } 1313 return ts, toSpannerErrorWithCommitInfo(err, true) 1314 } else if err == nil { 1315 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 1316 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 1317 } 1318 break 1319 } 1320 } 1321 return ts, ToSpannerError(err) 1322} 1323 1324// isAbortedErr returns true if the error indicates that an gRPC call is 1325// aborted on the server side. 1326func isAbortedErr(err error) bool { 1327 if err == nil { 1328 return false 1329 } 1330 if ErrCode(err) == codes.Aborted { 1331 return true 1332 } 1333 return false 1334} 1335