1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "cloud.google.com/go/internal/trace" 26 vkit "cloud.google.com/go/spanner/apiv1" 27 "github.com/golang/protobuf/proto" 28 "google.golang.org/api/iterator" 29 sppb "google.golang.org/genproto/googleapis/spanner/v1" 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/status" 32) 33 34// transactionID stores a transaction ID which uniquely identifies a transaction 35// in Cloud Spanner. 36type transactionID []byte 37 38// txReadEnv manages a read-transaction environment consisting of a session 39// handle and a transaction selector. 40type txReadEnv interface { 41 // acquire returns a read-transaction environment that can be used to 42 // perform a transactional read. 43 acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) 44 // sets the transaction's read timestamp 45 setTimestamp(time.Time) 46 // release should be called at the end of every transactional read to deal 47 // with session recycling. 48 release(error) 49} 50 51// txReadOnly contains methods for doing transactional reads. 52type txReadOnly struct { 53 // read-transaction environment for performing transactional read 54 // operations. 55 txReadEnv 56 57 // Atomic. Only needed for DML statements, but used forall. 58 sequenceNumber int64 59 60 // replaceSessionFunc is a function that can be called to replace the 61 // session that is used by the transaction. This function should only be 62 // defined for single-use transactions that can safely be retried on a 63 // different session. All other transactions will set this function to nil. 64 replaceSessionFunc func(ctx context.Context) error 65 66 // sp is the session pool for allocating a session to execute the read-only 67 // transaction. It is set only once during initialization of the 68 // txReadOnly. 69 sp *sessionPool 70 // sh is the sessionHandle allocated from sp. 71 sh *sessionHandle 72 73 // qo provides options for executing a sql query. 74 qo QueryOptions 75} 76 77// errSessionClosed returns error for using a recycled/destroyed session 78func errSessionClosed(sh *sessionHandle) error { 79 return spannerErrorf(codes.FailedPrecondition, 80 "session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient()) 81} 82 83// Read returns a RowIterator for reading multiple rows from the database. 84func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator { 85 return t.ReadWithOptions(ctx, table, keys, columns, nil) 86} 87 88// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}. 89func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) { 90 return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index}) 91} 92 93// ReadOptions provides options for reading rows from a database. 94type ReadOptions struct { 95 // The index to use for reading. If non-empty, you can only read columns 96 // that are part of the index key, part of the primary key, or stored in the 97 // index due to a STORING clause in the index definition. 98 Index string 99 100 // The maximum number of rows to read. A limit value less than 1 means no 101 // limit. 102 Limit int 103} 104 105// ReadWithOptions returns a RowIterator for reading multiple rows from the 106// database. Pass a ReadOptions to modify the read operation. 107func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) { 108 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read") 109 defer func() { trace.EndSpan(ctx, ri.err) }() 110 var ( 111 sh *sessionHandle 112 ts *sppb.TransactionSelector 113 err error 114 ) 115 kset, err := keys.keySetProto() 116 if err != nil { 117 return &RowIterator{err: err} 118 } 119 if sh, ts, err = t.acquire(ctx); err != nil { 120 return &RowIterator{err: err} 121 } 122 // Cloud Spanner will return "Session not found" on bad sessions. 123 client := sh.getClient() 124 if client == nil { 125 // Might happen if transaction is closed in the middle of a API call. 126 return &RowIterator{err: errSessionClosed(sh)} 127 } 128 index := "" 129 limit := 0 130 if opts != nil { 131 index = opts.Index 132 if opts.Limit > 0 { 133 limit = opts.Limit 134 } 135 } 136 return streamWithReplaceSessionFunc( 137 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 138 sh.session.logger, 139 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 140 return client.StreamingRead(ctx, 141 &sppb.ReadRequest{ 142 Session: t.sh.getID(), 143 Transaction: ts, 144 Table: table, 145 Index: index, 146 Columns: columns, 147 KeySet: kset, 148 ResumeToken: resumeToken, 149 Limit: int64(limit), 150 }) 151 }, 152 t.replaceSessionFunc, 153 t.setTimestamp, 154 t.release, 155 ) 156} 157 158// errRowNotFound returns error for not being able to read the row identified by 159// key. 160func errRowNotFound(table string, key Key) error { 161 return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key) 162} 163 164// errRowNotFoundByIndex returns error for not being able to read the row by index. 165func errRowNotFoundByIndex(table string, key Key, index string) error { 166 return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 167} 168 169// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index. 170func errMultipleRowsFound(table string, key Key, index string) error { 171 return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 172} 173 174// ReadRow reads a single row from the database. 175// 176// If no row is present with the given key, then ReadRow returns an error where 177// spanner.ErrCode(err) is codes.NotFound. 178func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) { 179 iter := t.Read(ctx, table, key, columns) 180 defer iter.Stop() 181 row, err := iter.Next() 182 switch err { 183 case iterator.Done: 184 return nil, errRowNotFound(table, key) 185 case nil: 186 return row, nil 187 default: 188 return nil, err 189 } 190} 191 192// ReadRowUsingIndex reads a single row from the database using an index. 193// 194// If no row is present with the given index, then ReadRowUsingIndex returns an 195// error where spanner.ErrCode(err) is codes.NotFound. 196// 197// If more than one row received with the given index, then ReadRowUsingIndex 198// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition. 199func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) { 200 iter := t.ReadUsingIndex(ctx, table, index, key, columns) 201 defer iter.Stop() 202 row, err := iter.Next() 203 switch err { 204 case iterator.Done: 205 return nil, errRowNotFoundByIndex(table, key, index) 206 case nil: 207 // If more than one row found, return an error. 208 _, err := iter.Next() 209 switch err { 210 case iterator.Done: 211 return row, nil 212 case nil: 213 return nil, errMultipleRowsFound(table, key, index) 214 default: 215 return nil, err 216 } 217 default: 218 return nil, err 219 } 220} 221 222// QueryOptions provides options for executing a sql query from a database. 223type QueryOptions struct { 224 Mode *sppb.ExecuteSqlRequest_QueryMode 225 Options *sppb.ExecuteSqlRequest_QueryOptions 226} 227 228// merge combines two QueryOptions that the input parameter will have higher 229// order of precedence. 230func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { 231 merged := QueryOptions{ 232 Mode: qo.Mode, 233 Options: &sppb.ExecuteSqlRequest_QueryOptions{}, 234 } 235 if opts.Mode != nil { 236 merged.Mode = opts.Mode 237 } 238 proto.Merge(merged.Options, qo.Options) 239 proto.Merge(merged.Options, opts.Options) 240 return merged 241} 242 243// Query executes a query against the database. It returns a RowIterator for 244// retrieving the resulting rows. 245// 246// Query returns only row data, without a query plan or execution statistics. 247// Use QueryWithStats to get rows along with the plan and statistics. Use 248// AnalyzeQuery to get just the plan. 249func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator { 250 mode := sppb.ExecuteSqlRequest_NORMAL 251 return t.query(ctx, statement, QueryOptions{ 252 Mode: &mode, 253 Options: t.qo.Options, 254 }) 255} 256 257// QueryWithOptions executes a SQL statment against the database. It returns 258// a RowIterator for retrieving the resulting rows. The sql query execution 259// will be optimized based on the given query options. 260func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, opts QueryOptions) *RowIterator { 261 return t.query(ctx, statement, t.qo.merge(opts)) 262} 263 264// QueryWithStats executes a SQL statement against the database. It returns 265// a RowIterator for retrieving the resulting rows. The RowIterator will also 266// be populated with a query plan and execution statistics. 267func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator { 268 mode := sppb.ExecuteSqlRequest_PROFILE 269 return t.query(ctx, statement, QueryOptions{ 270 Mode: &mode, 271 Options: t.qo.Options, 272 }) 273} 274 275// AnalyzeQuery returns the query plan for statement. 276func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) { 277 mode := sppb.ExecuteSqlRequest_PLAN 278 iter := t.query(ctx, statement, QueryOptions{ 279 Mode: &mode, 280 Options: t.qo.Options, 281 }) 282 defer iter.Stop() 283 for { 284 _, err := iter.Next() 285 if err == iterator.Done { 286 break 287 } 288 if err != nil { 289 return nil, err 290 } 291 } 292 if iter.QueryPlan == nil { 293 return nil, spannerErrorf(codes.Internal, "query plan unavailable") 294 } 295 return iter.QueryPlan, nil 296} 297 298func (t *txReadOnly) query(ctx context.Context, statement Statement, options QueryOptions) (ri *RowIterator) { 299 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query") 300 defer func() { trace.EndSpan(ctx, ri.err) }() 301 req, sh, err := t.prepareExecuteSQL(ctx, statement, options) 302 if err != nil { 303 return &RowIterator{err: err} 304 } 305 client := sh.getClient() 306 return streamWithReplaceSessionFunc( 307 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 308 sh.session.logger, 309 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 310 req.ResumeToken = resumeToken 311 req.Session = t.sh.getID() 312 return client.ExecuteStreamingSql(ctx, req) 313 }, 314 t.replaceSessionFunc, 315 t.setTimestamp, 316 t.release) 317} 318 319func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, options QueryOptions) (*sppb.ExecuteSqlRequest, *sessionHandle, error) { 320 sh, ts, err := t.acquire(ctx) 321 if err != nil { 322 return nil, nil, err 323 } 324 // Cloud Spanner will return "Session not found" on bad sessions. 325 sid := sh.getID() 326 if sid == "" { 327 // Might happen if transaction is closed in the middle of a API call. 328 return nil, nil, errSessionClosed(sh) 329 } 330 params, paramTypes, err := stmt.convertParams() 331 if err != nil { 332 return nil, nil, err 333 } 334 mode := sppb.ExecuteSqlRequest_NORMAL 335 if options.Mode != nil { 336 mode = *options.Mode 337 } 338 req := &sppb.ExecuteSqlRequest{ 339 Session: sid, 340 Transaction: ts, 341 Sql: stmt.SQL, 342 QueryMode: mode, 343 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 344 Params: params, 345 ParamTypes: paramTypes, 346 QueryOptions: options.Options, 347 } 348 return req, sh, nil 349} 350 351// txState is the status of a transaction. 352type txState int 353 354const ( 355 // transaction is new, waiting to be initialized.. 356 txNew txState = iota 357 // transaction is being initialized. 358 txInit 359 // transaction is active and can perform read/write. 360 txActive 361 // transaction is closed, cannot be used anymore. 362 txClosed 363) 364 365// errRtsUnavailable returns error for read transaction's read timestamp being 366// unavailable. 367func errRtsUnavailable() error { 368 return spannerErrorf(codes.Internal, "read timestamp is unavailable") 369} 370 371// errTxClosed returns error for using a closed transaction. 372func errTxClosed() error { 373 return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction") 374} 375 376// errUnexpectedTxState returns error for transaction enters an unexpected state. 377func errUnexpectedTxState(ts txState) error { 378 return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts) 379} 380 381// ReadOnlyTransaction provides a snapshot transaction with guaranteed 382// consistency across reads, but does not allow writes. Read-only transactions 383// can be configured to read at timestamps in the past. 384// 385// Read-only transactions do not take locks. Instead, they work by choosing a 386// Cloud Spanner timestamp, then executing all reads at that timestamp. Since 387// they do not acquire locks, they do not block concurrent read-write 388// transactions. 389// 390// Unlike locking read-write transactions, read-only transactions never abort. 391// They can fail if the chosen read timestamp is garbage collected; however, the 392// default garbage collection policy is generous enough that most applications 393// do not need to worry about this in practice. See the documentation of 394// TimestampBound for more details. 395// 396// A ReadOnlyTransaction consumes resources on the server until Close is called. 397type ReadOnlyTransaction struct { 398 // mu protects concurrent access to the internal states of ReadOnlyTransaction. 399 mu sync.Mutex 400 // txReadOnly contains methods for performing transactional reads. 401 txReadOnly 402 // singleUse indicates that the transaction can be used for only one read. 403 singleUse bool 404 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 405 // ReadOnlyTransaction. 406 tx transactionID 407 // txReadyOrClosed is for broadcasting that transaction ID has been returned 408 // by Cloud Spanner or that transaction is closed. 409 txReadyOrClosed chan struct{} 410 // state is the current transaction status of the ReadOnly transaction. 411 state txState 412 // rts is the read timestamp returned by transactional reads. 413 rts time.Time 414 // tb is the read staleness bound specification for transactional reads. 415 tb TimestampBound 416} 417 418// errTxInitTimeout returns error for timeout in waiting for initialization of 419// the transaction. 420func errTxInitTimeout() error { 421 return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization") 422} 423 424// getTimestampBound returns the read staleness bound specified for the 425// ReadOnlyTransaction. 426func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound { 427 t.mu.Lock() 428 defer t.mu.Unlock() 429 return t.tb 430} 431 432// begin starts a snapshot read-only Transaction on Cloud Spanner. 433func (t *ReadOnlyTransaction) begin(ctx context.Context) error { 434 var ( 435 locked bool 436 tx transactionID 437 rts time.Time 438 sh *sessionHandle 439 err error 440 res *sppb.Transaction 441 ) 442 defer func() { 443 if !locked { 444 t.mu.Lock() 445 // Not necessary, just to make it clear that t.mu is being held when 446 // locked == true. 447 locked = true 448 } 449 if t.state != txClosed { 450 // Signal other initialization routines. 451 close(t.txReadyOrClosed) 452 t.txReadyOrClosed = make(chan struct{}) 453 } 454 t.mu.Unlock() 455 if err != nil && sh != nil { 456 // Got a valid session handle, but failed to initialize transaction= 457 // on Cloud Spanner. 458 if isSessionNotFoundError(err) { 459 sh.destroy() 460 } 461 // If sh.destroy was already executed, this becomes a noop. 462 sh.recycle() 463 } 464 }() 465 // Retry the BeginTransaction call if a 'Session not found' is returned. 466 for { 467 sh, err = t.sp.take(ctx) 468 if err != nil { 469 return err 470 } 471 res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 472 Session: sh.getID(), 473 Options: &sppb.TransactionOptions{ 474 Mode: &sppb.TransactionOptions_ReadOnly_{ 475 ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true), 476 }, 477 }, 478 }) 479 if isSessionNotFoundError(err) { 480 sh.destroy() 481 continue 482 } else if err == nil { 483 tx = res.Id 484 if res.ReadTimestamp != nil { 485 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 486 } 487 } else { 488 err = toSpannerError(err) 489 } 490 break 491 } 492 t.mu.Lock() 493 494 // defer function will be executed with t.mu being held. 495 locked = true 496 497 // During the execution of t.begin(), t.Close() was invoked. 498 if t.state == txClosed { 499 return errSessionClosed(sh) 500 } 501 502 // If begin() fails, this allows other queries to take over the 503 // initialization. 504 t.tx = nil 505 if err == nil { 506 t.tx = tx 507 t.rts = rts 508 t.sh = sh 509 // State transite to txActive. 510 t.state = txActive 511 } 512 return err 513} 514 515// acquire implements txReadEnv.acquire. 516func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 517 if err := checkNestedTxn(ctx); err != nil { 518 return nil, nil, err 519 } 520 if t.singleUse { 521 return t.acquireSingleUse(ctx) 522 } 523 return t.acquireMultiUse(ctx) 524} 525 526func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 527 t.mu.Lock() 528 defer t.mu.Unlock() 529 switch t.state { 530 case txClosed: 531 // A closed single-use transaction can never be reused. 532 return nil, nil, errTxClosed() 533 case txNew: 534 t.state = txClosed 535 ts := &sppb.TransactionSelector{ 536 Selector: &sppb.TransactionSelector_SingleUse{ 537 SingleUse: &sppb.TransactionOptions{ 538 Mode: &sppb.TransactionOptions_ReadOnly_{ 539 ReadOnly: buildTransactionOptionsReadOnly(t.tb, true), 540 }, 541 }, 542 }, 543 } 544 sh, err := t.sp.take(ctx) 545 if err != nil { 546 return nil, nil, err 547 } 548 549 // Install session handle into t, which can be used for readonly 550 // operations later. 551 t.sh = sh 552 return sh, ts, nil 553 } 554 us := t.state 555 556 // SingleUse transaction should only be in either txNew state or txClosed 557 // state. 558 return nil, nil, errUnexpectedTxState(us) 559} 560 561func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 562 for { 563 t.mu.Lock() 564 switch t.state { 565 case txClosed: 566 t.mu.Unlock() 567 return nil, nil, errTxClosed() 568 case txNew: 569 // State transit to txInit so that no further TimestampBound change 570 // is accepted. 571 t.state = txInit 572 t.mu.Unlock() 573 continue 574 case txInit: 575 if t.tx != nil { 576 // Wait for a transaction ID to become ready. 577 txReadyOrClosed := t.txReadyOrClosed 578 t.mu.Unlock() 579 select { 580 case <-txReadyOrClosed: 581 // Need to check transaction state again. 582 continue 583 case <-ctx.Done(): 584 // The waiting for initialization is timeout, return error 585 // directly. 586 return nil, nil, errTxInitTimeout() 587 } 588 } 589 // Take the ownership of initializing the transaction. 590 t.tx = transactionID{} 591 t.mu.Unlock() 592 // Begin a read-only transaction. 593 // 594 // TODO: consider adding a transaction option which allow queries to 595 // initiate transactions by themselves. Note that this option might 596 // not be always good because the ID of the new transaction won't 597 // be ready till the query returns some data or completes. 598 if err := t.begin(ctx); err != nil { 599 return nil, nil, err 600 } 601 602 // If t.begin() succeeded, t.state should have been changed to 603 // txActive, so we can just continue here. 604 continue 605 case txActive: 606 sh := t.sh 607 ts := &sppb.TransactionSelector{ 608 Selector: &sppb.TransactionSelector_Id{ 609 Id: t.tx, 610 }, 611 } 612 t.mu.Unlock() 613 return sh, ts, nil 614 } 615 state := t.state 616 t.mu.Unlock() 617 return nil, nil, errUnexpectedTxState(state) 618 } 619} 620 621func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) { 622 t.mu.Lock() 623 defer t.mu.Unlock() 624 if t.rts.IsZero() { 625 t.rts = ts 626 } 627} 628 629// release implements txReadEnv.release. 630func (t *ReadOnlyTransaction) release(err error) { 631 t.mu.Lock() 632 sh := t.sh 633 t.mu.Unlock() 634 if sh != nil { // sh could be nil if t.acquire() fails. 635 if isSessionNotFoundError(err) { 636 sh.destroy() 637 } 638 if t.singleUse { 639 // If session handle is already destroyed, this becomes a noop. 640 sh.recycle() 641 } 642 } 643} 644 645// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads 646// after being closed. 647func (t *ReadOnlyTransaction) Close() { 648 if t.singleUse { 649 return 650 } 651 t.mu.Lock() 652 if t.state != txClosed { 653 t.state = txClosed 654 close(t.txReadyOrClosed) 655 } 656 sh := t.sh 657 t.mu.Unlock() 658 if sh == nil { 659 return 660 } 661 // If session handle is already destroyed, this becomes a noop. If there are 662 // still active queries and if the recycled session is reused before they 663 // complete, Cloud Spanner will cancel them on behalf of the new transaction 664 // on the session. 665 if sh != nil { 666 sh.recycle() 667 } 668} 669 670// Timestamp returns the timestamp chosen to perform reads and queries in this 671// transaction. The value can only be read after some read or query has either 672// returned some data or completed without returning any data. 673func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) { 674 t.mu.Lock() 675 defer t.mu.Unlock() 676 if t.rts.IsZero() { 677 return t.rts, errRtsUnavailable() 678 } 679 return t.rts, nil 680} 681 682// WithTimestampBound specifies the TimestampBound to use for read or query. 683// This can only be used before the first read or query is invoked. Note: 684// bounded staleness is not available with general ReadOnlyTransactions; use a 685// single-use ReadOnlyTransaction instead. 686// 687// The returned value is the ReadOnlyTransaction so calls can be chained. 688func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction { 689 t.mu.Lock() 690 defer t.mu.Unlock() 691 if t.state == txNew { 692 // Only allow to set TimestampBound before the first query. 693 t.tb = tb 694 } 695 return t 696} 697 698// ReadWriteTransaction provides a locking read-write transaction. 699// 700// This type of transaction is the only way to write data into Cloud Spanner; 701// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use 702// transactions internally. These transactions rely on pessimistic locking and, 703// if necessary, two-phase commit. Locking read-write transactions may abort, 704// requiring the application to retry. However, the interface exposed by 705// (*Client).ReadWriteTransaction eliminates the need for applications to write 706// retry loops explicitly. 707// 708// Locking transactions may be used to atomically read-modify-write data 709// anywhere in a database. This type of transaction is externally consistent. 710// 711// Clients should attempt to minimize the amount of time a transaction is 712// active. Faster transactions commit with higher probability and cause less 713// contention. Cloud Spanner attempts to keep read locks active as long as the 714// transaction continues to do reads. Long periods of inactivity at the client 715// may cause Cloud Spanner to release a transaction's locks and abort it. 716// 717// Reads performed within a transaction acquire locks on the data being 718// read. Writes can only be done at commit time, after all reads have been 719// completed. Conceptually, a read-write transaction consists of zero or more 720// reads or SQL queries followed by a commit. 721// 722// See (*Client).ReadWriteTransaction for an example. 723// 724// Semantics 725// 726// Cloud Spanner can commit the transaction if all read locks it acquired are 727// still valid at commit time, and it is able to acquire write locks for all 728// writes. Cloud Spanner can abort the transaction for any reason. If a commit 729// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has 730// not modified any user data in Cloud Spanner. 731// 732// Unless the transaction commits, Cloud Spanner makes no guarantees about how 733// long the transaction's locks were held for. It is an error to use Cloud 734// Spanner locks for any sort of mutual exclusion other than between Cloud 735// Spanner transactions themselves. 736// 737// Aborted transactions 738// 739// Application code does not need to retry explicitly; RunInTransaction will 740// automatically retry a transaction if an attempt results in an abort. The lock 741// priority of a transaction increases after each prior aborted transaction, 742// meaning that the next attempt has a slightly better chance of success than 743// before. 744// 745// Under some circumstances (e.g., many transactions attempting to modify the 746// same row(s)), a transaction can abort many times in a short period before 747// successfully committing. Thus, it is not a good idea to cap the number of 748// retries a transaction can attempt; instead, it is better to limit the total 749// amount of wall time spent retrying. 750// 751// Idle transactions 752// 753// A transaction is considered idle if it has no outstanding reads or SQL 754// queries and has not started a read or SQL query within the last 10 755// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't 756// hold on to locks indefinitely. In that case, the commit will fail with error 757// ABORTED. 758// 759// If this behavior is undesirable, periodically executing a simple SQL query 760// in the transaction (e.g., SELECT 1) prevents the transaction from becoming 761// idle. 762type ReadWriteTransaction struct { 763 // txReadOnly contains methods for performing transactional reads. 764 txReadOnly 765 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 766 // ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin() 767 // during the initialization of ReadWriteTransaction. 768 tx transactionID 769 // mu protects concurrent access to the internal states of 770 // ReadWriteTransaction. 771 mu sync.Mutex 772 // state is the current transaction status of the read-write transaction. 773 state txState 774 // wb is the set of buffered mutations waiting to be committed. 775 wb []*Mutation 776} 777 778// BufferWrite adds a list of mutations to the set of updates that will be 779// applied when the transaction is committed. It does not actually apply the 780// write until the transaction is committed, so the operation does not block. 781// The effects of the write won't be visible to any reads (including reads done 782// in the same transaction) until the transaction commits. 783// 784// See the example for Client.ReadWriteTransaction. 785func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error { 786 t.mu.Lock() 787 defer t.mu.Unlock() 788 if t.state == txClosed { 789 return errTxClosed() 790 } 791 if t.state != txActive { 792 return errUnexpectedTxState(t.state) 793 } 794 t.wb = append(t.wb, ms...) 795 return nil 796} 797 798// Update executes a DML statement against the database. It returns the number 799// of affected rows. Update returns an error if the statement is a query. 800// However, the query is executed, and any data read will be validated upon 801// commit. 802func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) { 803 mode := sppb.ExecuteSqlRequest_NORMAL 804 return t.update(ctx, stmt, QueryOptions{ 805 Mode: &mode, 806 Options: t.qo.Options, 807 }) 808} 809 810// UpdateWithOptions executes a DML statement against the database. It returns 811// the number of affected rows. The sql query execution will be optimized 812// based on the given query options. 813func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { 814 return t.update(ctx, stmt, t.qo.merge(opts)) 815} 816 817func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { 818 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update") 819 defer func() { trace.EndSpan(ctx, err) }() 820 req, sh, err := t.prepareExecuteSQL(ctx, stmt, opts) 821 if err != nil { 822 return 0, err 823 } 824 resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req) 825 if err != nil { 826 return 0, toSpannerError(err) 827 } 828 if resultSet.Stats == nil { 829 return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL) 830 } 831 return extractRowCount(resultSet.Stats) 832} 833 834// BatchUpdate groups one or more DML statements and sends them to Spanner in a 835// single RPC. This is an efficient way to execute multiple DML statements. 836// 837// A slice of counts is returned, where each count represents the number of 838// affected rows for the given query at the same index. If an error occurs, 839// counts will be returned up to the query that encountered the error. 840func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) { 841 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate") 842 defer func() { trace.EndSpan(ctx, err) }() 843 844 sh, ts, err := t.acquire(ctx) 845 if err != nil { 846 return nil, err 847 } 848 // Cloud Spanner will return "Session not found" on bad sessions. 849 sid := sh.getID() 850 if sid == "" { 851 // Might happen if transaction is closed in the middle of a API call. 852 return nil, errSessionClosed(sh) 853 } 854 855 var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement 856 for _, st := range stmts { 857 params, paramTypes, err := st.convertParams() 858 if err != nil { 859 return nil, err 860 } 861 sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{ 862 Sql: st.SQL, 863 Params: params, 864 ParamTypes: paramTypes, 865 }) 866 } 867 868 resp, err := sh.getClient().ExecuteBatchDml(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.ExecuteBatchDmlRequest{ 869 Session: sh.getID(), 870 Transaction: ts, 871 Statements: sppbStmts, 872 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 873 }) 874 if err != nil { 875 return nil, toSpannerError(err) 876 } 877 878 var counts []int64 879 for _, rs := range resp.ResultSets { 880 count, err := extractRowCount(rs.Stats) 881 if err != nil { 882 return nil, err 883 } 884 counts = append(counts, count) 885 } 886 if resp.Status != nil && resp.Status.Code != 0 { 887 return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message) 888 } 889 return counts, nil 890} 891 892// acquire implements txReadEnv.acquire. 893func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 894 ts := &sppb.TransactionSelector{ 895 Selector: &sppb.TransactionSelector_Id{ 896 Id: t.tx, 897 }, 898 } 899 t.mu.Lock() 900 defer t.mu.Unlock() 901 switch t.state { 902 case txClosed: 903 return nil, nil, errTxClosed() 904 case txActive: 905 return t.sh, ts, nil 906 } 907 return nil, nil, errUnexpectedTxState(t.state) 908} 909 910// release implements txReadEnv.release. 911func (t *ReadWriteTransaction) release(err error) { 912 t.mu.Lock() 913 sh := t.sh 914 t.mu.Unlock() 915 if sh != nil && isSessionNotFoundError(err) { 916 sh.destroy() 917 } 918} 919 920func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) { 921 res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{ 922 Session: sid, 923 Options: &sppb.TransactionOptions{ 924 Mode: &sppb.TransactionOptions_ReadWrite_{ 925 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 926 }, 927 }, 928 }) 929 if err != nil { 930 return nil, err 931 } 932 if res.Id == nil { 933 return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.") 934 } 935 return res.Id, nil 936} 937 938// begin starts a read-write transacton on Cloud Spanner, it is always called 939// before any of the public APIs. 940func (t *ReadWriteTransaction) begin(ctx context.Context) error { 941 if t.tx != nil { 942 t.state = txActive 943 return nil 944 } 945 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) 946 if err == nil { 947 t.tx = tx 948 t.state = txActive 949 return nil 950 } 951 if isSessionNotFoundError(err) { 952 t.sh.destroy() 953 } 954 return err 955} 956 957// commit tries to commit a readwrite transaction to Cloud Spanner. It also 958// returns the commit timestamp for the transactions. 959func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) { 960 var ts time.Time 961 t.mu.Lock() 962 t.state = txClosed // No further operations after commit. 963 mPb, err := mutationsProto(t.wb) 964 t.mu.Unlock() 965 if err != nil { 966 return ts, err 967 } 968 // In case that sessionHandle was destroyed but transaction body fails to 969 // report it. 970 sid, client := t.sh.getID(), t.sh.getClient() 971 if sid == "" || client == nil { 972 return ts, errSessionClosed(t.sh) 973 } 974 975 res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{ 976 Session: sid, 977 Transaction: &sppb.CommitRequest_TransactionId{ 978 TransactionId: t.tx, 979 }, 980 Mutations: mPb, 981 }) 982 if e != nil { 983 return ts, toSpannerErrorWithCommitInfo(e, true) 984 } 985 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 986 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 987 } 988 if isSessionNotFoundError(err) { 989 t.sh.destroy() 990 } 991 return ts, err 992} 993 994// rollback is called when a commit is aborted or the transaction body runs 995// into error. 996func (t *ReadWriteTransaction) rollback(ctx context.Context) { 997 t.mu.Lock() 998 // Forbid further operations on rollbacked transaction. 999 t.state = txClosed 1000 t.mu.Unlock() 1001 // In case that sessionHandle was destroyed but transaction body fails to 1002 // report it. 1003 sid, client := t.sh.getID(), t.sh.getClient() 1004 if sid == "" || client == nil { 1005 return 1006 } 1007 err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{ 1008 Session: sid, 1009 TransactionId: t.tx, 1010 }) 1011 if isSessionNotFoundError(err) { 1012 t.sh.destroy() 1013 } 1014} 1015 1016// runInTransaction executes f under a read-write transaction context. 1017func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) { 1018 var ( 1019 ts time.Time 1020 err error 1021 errDuringCommit bool 1022 ) 1023 if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil { 1024 // Try to commit if transaction body returns no error. 1025 ts, err = t.commit(ctx) 1026 errDuringCommit = err != nil 1027 } 1028 if err != nil { 1029 if isAbortedErr(err) { 1030 // Retry the transaction using the same session on ABORT error. 1031 // Cloud Spanner will create the new transaction with the previous 1032 // one's wound-wait priority. 1033 return ts, err 1034 } 1035 if isSessionNotFoundError(err) { 1036 t.sh.destroy() 1037 return ts, err 1038 } 1039 // Rollback the transaction unless the error occurred during the 1040 // commit. Executing a rollback after a commit has failed will 1041 // otherwise cause an error. Note that transient errors, such as 1042 // UNAVAILABLE, are already handled in the gRPC layer and do not show 1043 // up here. Context errors (deadline exceeded / canceled) during 1044 // commits are also not rolled back. 1045 if !errDuringCommit { 1046 t.rollback(ctx) 1047 } 1048 return ts, err 1049 } 1050 // err == nil, return commit timestamp. 1051 return ts, nil 1052} 1053 1054// ReadWriteStmtBasedTransaction provides a wrapper of ReadWriteTransaction in 1055// order to run a read-write transaction in a statement-based way. 1056// 1057// This struct is returned by NewReadWriteStmtBasedTransaction and contains 1058// Commit() and Rollback() methods to end a transaction. 1059type ReadWriteStmtBasedTransaction struct { 1060 // ReadWriteTransaction contains methods for performing transactional reads. 1061 ReadWriteTransaction 1062} 1063 1064// NewReadWriteStmtBasedTransaction starts a read-write transaction. Commit() or 1065// Rollback() must be called to end a transaction. If Commit() or Rollback() is 1066// not called, the session that is used by the transaction will not be returned 1067// to the pool and cause a session leak. 1068// 1069// This method should only be used when manual error handling and retry 1070// management is needed. Cloud Spanner may abort a read/write transaction at any 1071// moment, and each statement that is executed on the transaction should be 1072// checked for an Aborted error, including queries and read operations. 1073// 1074// For most use cases, client.ReadWriteTransaction should be used, as it will 1075// handle all Aborted and 'Session not found' errors automatically. 1076func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWriteStmtBasedTransaction, error) { 1077 var ( 1078 sh *sessionHandle 1079 err error 1080 t *ReadWriteStmtBasedTransaction 1081 ) 1082 sh, err = c.idleSessions.takeWriteSession(ctx) 1083 if err != nil { 1084 // If session retrieval fails, just fail the transaction. 1085 return nil, err 1086 } 1087 t = &ReadWriteStmtBasedTransaction{ 1088 ReadWriteTransaction: ReadWriteTransaction{ 1089 tx: sh.getTransactionID(), 1090 }, 1091 } 1092 t.txReadOnly.sh = sh 1093 t.txReadOnly.txReadEnv = t 1094 t.txReadOnly.qo = c.qo 1095 1096 if err = t.begin(ctx); err != nil { 1097 if sh != nil { 1098 sh.recycle() 1099 } 1100 return nil, err 1101 } 1102 return t, err 1103} 1104 1105// Commit tries to commit a readwrite transaction to Cloud Spanner. It also 1106// returns the commit timestamp for the transactions. 1107func (t *ReadWriteStmtBasedTransaction) Commit(ctx context.Context) (time.Time, error) { 1108 var ( 1109 ts time.Time 1110 err error 1111 ) 1112 ts, err = t.commit(ctx) 1113 // Rolling back an aborted transaction is not necessary. 1114 if err != nil && status.Code(err) != codes.Aborted { 1115 t.rollback(ctx) 1116 } 1117 if t.sh != nil { 1118 t.sh.recycle() 1119 } 1120 return ts, err 1121} 1122 1123// Rollback is called to cancel the ongoing transaction that has not been 1124// committed yet. 1125func (t *ReadWriteStmtBasedTransaction) Rollback(ctx context.Context) { 1126 t.rollback(ctx) 1127 if t.sh != nil { 1128 t.sh.recycle() 1129 } 1130} 1131 1132// writeOnlyTransaction provides the most efficient way of doing write-only 1133// transactions. It essentially does blind writes to Cloud Spanner. 1134type writeOnlyTransaction struct { 1135 // sp is the session pool which writeOnlyTransaction uses to get Cloud 1136 // Spanner sessions for blind writes. 1137 sp *sessionPool 1138} 1139 1140// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, 1141// unless one of the following happens: 1142// 1143// 1) Context times out. 1144// 2) An unretryable error (e.g. database not found) occurs. 1145// 3) There is a malformed Mutation object. 1146func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) { 1147 var ( 1148 ts time.Time 1149 sh *sessionHandle 1150 ) 1151 mPb, err := mutationsProto(ms) 1152 if err != nil { 1153 // Malformed mutation found, just return the error. 1154 return ts, err 1155 } 1156 1157 // Retry-loop for aborted transactions. 1158 // TODO: Replace with generic retryer. 1159 for { 1160 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 1161 // No usable session for doing the commit, take one from pool. 1162 sh, err = t.sp.take(ctx) 1163 if err != nil { 1164 // sessionPool.Take already retries for session 1165 // creations/retrivals. 1166 return ts, err 1167 } 1168 defer sh.recycle() 1169 } 1170 res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ 1171 Session: sh.getID(), 1172 Transaction: &sppb.CommitRequest_SingleUseTransaction{ 1173 SingleUseTransaction: &sppb.TransactionOptions{ 1174 Mode: &sppb.TransactionOptions_ReadWrite_{ 1175 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 1176 }, 1177 }, 1178 }, 1179 Mutations: mPb, 1180 }) 1181 if err != nil && !isAbortedErr(err) { 1182 if isSessionNotFoundError(err) { 1183 // Discard the bad session. 1184 sh.destroy() 1185 } 1186 return ts, toSpannerErrorWithCommitInfo(err, true) 1187 } else if err == nil { 1188 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 1189 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 1190 } 1191 break 1192 } 1193 } 1194 return ts, toSpannerError(err) 1195} 1196 1197// isAbortedErr returns true if the error indicates that an gRPC call is 1198// aborted on the server side. 1199func isAbortedErr(err error) bool { 1200 if err == nil { 1201 return false 1202 } 1203 if ErrCode(err) == codes.Aborted { 1204 return true 1205 } 1206 return false 1207} 1208