1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "cloud.google.com/go/internal/trace" 26 vkit "cloud.google.com/go/spanner/apiv1" 27 "google.golang.org/api/iterator" 28 sppb "google.golang.org/genproto/googleapis/spanner/v1" 29 "google.golang.org/grpc/codes" 30) 31 32// transactionID stores a transaction ID which uniquely identifies a transaction 33// in Cloud Spanner. 34type transactionID []byte 35 36// txReadEnv manages a read-transaction environment consisting of a session 37// handle and a transaction selector. 38type txReadEnv interface { 39 // acquire returns a read-transaction environment that can be used to 40 // perform a transactional read. 41 acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) 42 // sets the transaction's read timestamp 43 setTimestamp(time.Time) 44 // release should be called at the end of every transactional read to deal 45 // with session recycling. 46 release(error) 47} 48 49// txReadOnly contains methods for doing transactional reads. 50type txReadOnly struct { 51 // read-transaction environment for performing transactional read 52 // operations. 53 txReadEnv 54 55 // Atomic. Only needed for DML statements, but used forall. 56 sequenceNumber int64 57 58 // replaceSessionFunc is a function that can be called to replace the 59 // session that is used by the transaction. This function should only be 60 // defined for single-use transactions that can safely be retried on a 61 // different session. All other transactions will set this function to nil. 62 replaceSessionFunc func(ctx context.Context) error 63 64 // sp is the session pool for allocating a session to execute the read-only 65 // transaction. It is set only once during initialization of the 66 // txReadOnly. 67 sp *sessionPool 68 // sh is the sessionHandle allocated from sp. 69 sh *sessionHandle 70} 71 72// errSessionClosed returns error for using a recycled/destroyed session 73func errSessionClosed(sh *sessionHandle) error { 74 return spannerErrorf(codes.FailedPrecondition, 75 "session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient()) 76} 77 78// Read returns a RowIterator for reading multiple rows from the database. 79func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator { 80 return t.ReadWithOptions(ctx, table, keys, columns, nil) 81} 82 83// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}. 84func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) { 85 return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index}) 86} 87 88// ReadOptions provides options for reading rows from a database. 89type ReadOptions struct { 90 // The index to use for reading. If non-empty, you can only read columns 91 // that are part of the index key, part of the primary key, or stored in the 92 // index due to a STORING clause in the index definition. 93 Index string 94 95 // The maximum number of rows to read. A limit value less than 1 means no 96 // limit. 97 Limit int 98} 99 100// ReadWithOptions returns a RowIterator for reading multiple rows from the 101// database. Pass a ReadOptions to modify the read operation. 102func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) { 103 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read") 104 defer func() { trace.EndSpan(ctx, ri.err) }() 105 var ( 106 sh *sessionHandle 107 ts *sppb.TransactionSelector 108 err error 109 ) 110 kset, err := keys.keySetProto() 111 if err != nil { 112 return &RowIterator{err: err} 113 } 114 if sh, ts, err = t.acquire(ctx); err != nil { 115 return &RowIterator{err: err} 116 } 117 // Cloud Spanner will return "Session not found" on bad sessions. 118 sid, client := sh.getID(), sh.getClient() 119 if sid == "" || client == nil { 120 // Might happen if transaction is closed in the middle of a API call. 121 return &RowIterator{err: errSessionClosed(sh)} 122 } 123 index := "" 124 limit := 0 125 if opts != nil { 126 index = opts.Index 127 if opts.Limit > 0 { 128 limit = opts.Limit 129 } 130 } 131 return stream( 132 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 133 sh.session.logger, 134 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 135 return client.StreamingRead(ctx, 136 &sppb.ReadRequest{ 137 Session: sid, 138 Transaction: ts, 139 Table: table, 140 Index: index, 141 Columns: columns, 142 KeySet: kset, 143 ResumeToken: resumeToken, 144 Limit: int64(limit), 145 }) 146 }, 147 t.setTimestamp, 148 t.release, 149 ) 150} 151 152// errRowNotFound returns error for not being able to read the row identified by 153// key. 154func errRowNotFound(table string, key Key) error { 155 return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key) 156} 157 158// errRowNotFoundByIndex returns error for not being able to read the row by index. 159func errRowNotFoundByIndex(table string, key Key, index string) error { 160 return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 161} 162 163// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index. 164func errMultipleRowsFound(table string, key Key, index string) error { 165 return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 166} 167 168// ReadRow reads a single row from the database. 169// 170// If no row is present with the given key, then ReadRow returns an error where 171// spanner.ErrCode(err) is codes.NotFound. 172func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) { 173 iter := t.Read(ctx, table, key, columns) 174 defer iter.Stop() 175 row, err := iter.Next() 176 switch err { 177 case iterator.Done: 178 return nil, errRowNotFound(table, key) 179 case nil: 180 return row, nil 181 default: 182 return nil, err 183 } 184} 185 186// ReadRowUsingIndex reads a single row from the database using an index. 187// 188// If no row is present with the given index, then ReadRowUsingIndex returns an 189// error where spanner.ErrCode(err) is codes.NotFound. 190// 191// If more than one row received with the given index, then ReadRowUsingIndex 192// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition. 193func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) { 194 iter := t.ReadUsingIndex(ctx, table, index, key, columns) 195 defer iter.Stop() 196 row, err := iter.Next() 197 switch err { 198 case iterator.Done: 199 return nil, errRowNotFoundByIndex(table, key, index) 200 case nil: 201 // If more than one row found, return an error. 202 _, err := iter.Next() 203 switch err { 204 case iterator.Done: 205 return row, nil 206 case nil: 207 return nil, errMultipleRowsFound(table, key, index) 208 default: 209 return nil, err 210 } 211 default: 212 return nil, err 213 } 214} 215 216// Query executes a query against the database. It returns a RowIterator for 217// retrieving the resulting rows. 218// 219// Query returns only row data, without a query plan or execution statistics. 220// Use QueryWithStats to get rows along with the plan and statistics. Use 221// AnalyzeQuery to get just the plan. 222func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator { 223 return t.query(ctx, statement, sppb.ExecuteSqlRequest_NORMAL) 224} 225 226// Query executes a SQL statement against the database. It returns a RowIterator 227// for retrieving the resulting rows. The RowIterator will also be populated 228// with a query plan and execution statistics. 229func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator { 230 return t.query(ctx, statement, sppb.ExecuteSqlRequest_PROFILE) 231} 232 233// AnalyzeQuery returns the query plan for statement. 234func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) { 235 iter := t.query(ctx, statement, sppb.ExecuteSqlRequest_PLAN) 236 defer iter.Stop() 237 for { 238 _, err := iter.Next() 239 if err == iterator.Done { 240 break 241 } 242 if err != nil { 243 return nil, err 244 } 245 } 246 if iter.QueryPlan == nil { 247 return nil, spannerErrorf(codes.Internal, "query plan unavailable") 248 } 249 return iter.QueryPlan, nil 250} 251 252func (t *txReadOnly) query(ctx context.Context, statement Statement, mode sppb.ExecuteSqlRequest_QueryMode) (ri *RowIterator) { 253 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query") 254 defer func() { trace.EndSpan(ctx, ri.err) }() 255 req, sh, err := t.prepareExecuteSQL(ctx, statement, mode) 256 if err != nil { 257 return &RowIterator{err: err} 258 } 259 client := sh.getClient() 260 return streamWithReplaceSessionFunc( 261 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 262 sh.session.logger, 263 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 264 req.ResumeToken = resumeToken 265 req.Session = t.sh.getID() 266 return client.ExecuteStreamingSql(ctx, req) 267 }, 268 t.replaceSessionFunc, 269 t.setTimestamp, 270 t.release) 271} 272 273func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, mode sppb.ExecuteSqlRequest_QueryMode) (*sppb.ExecuteSqlRequest, *sessionHandle, error) { 274 sh, ts, err := t.acquire(ctx) 275 if err != nil { 276 return nil, nil, err 277 } 278 // Cloud Spanner will return "Session not found" on bad sessions. 279 sid := sh.getID() 280 if sid == "" { 281 // Might happen if transaction is closed in the middle of a API call. 282 return nil, nil, errSessionClosed(sh) 283 } 284 params, paramTypes, err := stmt.convertParams() 285 if err != nil { 286 return nil, nil, err 287 } 288 req := &sppb.ExecuteSqlRequest{ 289 Session: sid, 290 Transaction: ts, 291 Sql: stmt.SQL, 292 QueryMode: mode, 293 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 294 Params: params, 295 ParamTypes: paramTypes, 296 } 297 return req, sh, nil 298} 299 300// txState is the status of a transaction. 301type txState int 302 303const ( 304 // transaction is new, waiting to be initialized.. 305 txNew txState = iota 306 // transaction is being initialized. 307 txInit 308 // transaction is active and can perform read/write. 309 txActive 310 // transaction is closed, cannot be used anymore. 311 txClosed 312) 313 314// errRtsUnavailable returns error for read transaction's read timestamp being 315// unavailable. 316func errRtsUnavailable() error { 317 return spannerErrorf(codes.Internal, "read timestamp is unavailable") 318} 319 320// errTxClosed returns error for using a closed transaction. 321func errTxClosed() error { 322 return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction") 323} 324 325// errUnexpectedTxState returns error for transaction enters an unexpected state. 326func errUnexpectedTxState(ts txState) error { 327 return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts) 328} 329 330// ReadOnlyTransaction provides a snapshot transaction with guaranteed 331// consistency across reads, but does not allow writes. Read-only transactions 332// can be configured to read at timestamps in the past. 333// 334// Read-only transactions do not take locks. Instead, they work by choosing a 335// Cloud Spanner timestamp, then executing all reads at that timestamp. Since 336// they do not acquire locks, they do not block concurrent read-write 337// transactions. 338// 339// Unlike locking read-write transactions, read-only transactions never abort. 340// They can fail if the chosen read timestamp is garbage collected; however, the 341// default garbage collection policy is generous enough that most applications 342// do not need to worry about this in practice. See the documentation of 343// TimestampBound for more details. 344// 345// A ReadOnlyTransaction consumes resources on the server until Close is called. 346type ReadOnlyTransaction struct { 347 // mu protects concurrent access to the internal states of ReadOnlyTransaction. 348 mu sync.Mutex 349 // txReadOnly contains methods for performing transactional reads. 350 txReadOnly 351 // singleUse indicates that the transaction can be used for only one read. 352 singleUse bool 353 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 354 // ReadOnlyTransaction. 355 tx transactionID 356 // txReadyOrClosed is for broadcasting that transaction ID has been returned 357 // by Cloud Spanner or that transaction is closed. 358 txReadyOrClosed chan struct{} 359 // state is the current transaction status of the ReadOnly transaction. 360 state txState 361 // rts is the read timestamp returned by transactional reads. 362 rts time.Time 363 // tb is the read staleness bound specification for transactional reads. 364 tb TimestampBound 365} 366 367// errTxInitTimeout returns error for timeout in waiting for initialization of 368// the transaction. 369func errTxInitTimeout() error { 370 return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization") 371} 372 373// getTimestampBound returns the read staleness bound specified for the 374// ReadOnlyTransaction. 375func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound { 376 t.mu.Lock() 377 defer t.mu.Unlock() 378 return t.tb 379} 380 381// begin starts a snapshot read-only Transaction on Cloud Spanner. 382func (t *ReadOnlyTransaction) begin(ctx context.Context) error { 383 var ( 384 locked bool 385 tx transactionID 386 rts time.Time 387 sh *sessionHandle 388 err error 389 res *sppb.Transaction 390 ) 391 defer func() { 392 if !locked { 393 t.mu.Lock() 394 // Not necessary, just to make it clear that t.mu is being held when 395 // locked == true. 396 locked = true 397 } 398 if t.state != txClosed { 399 // Signal other initialization routines. 400 close(t.txReadyOrClosed) 401 t.txReadyOrClosed = make(chan struct{}) 402 } 403 t.mu.Unlock() 404 if err != nil && sh != nil { 405 // Got a valid session handle, but failed to initialize transaction= 406 // on Cloud Spanner. 407 if isSessionNotFoundError(err) { 408 sh.destroy() 409 } 410 // If sh.destroy was already executed, this becomes a noop. 411 sh.recycle() 412 } 413 }() 414 // Retry the BeginTransaction call if a 'Session not found' is returned. 415 for { 416 sh, err = t.sp.take(ctx) 417 if err != nil { 418 return err 419 } 420 res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 421 Session: sh.getID(), 422 Options: &sppb.TransactionOptions{ 423 Mode: &sppb.TransactionOptions_ReadOnly_{ 424 ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true), 425 }, 426 }, 427 }) 428 if isSessionNotFoundError(err) { 429 sh.destroy() 430 continue 431 } else if err == nil { 432 tx = res.Id 433 if res.ReadTimestamp != nil { 434 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 435 } 436 } else { 437 err = toSpannerError(err) 438 } 439 break 440 } 441 t.mu.Lock() 442 443 // defer function will be executed with t.mu being held. 444 locked = true 445 446 // During the execution of t.begin(), t.Close() was invoked. 447 if t.state == txClosed { 448 return errSessionClosed(sh) 449 } 450 451 // If begin() fails, this allows other queries to take over the 452 // initialization. 453 t.tx = nil 454 if err == nil { 455 t.tx = tx 456 t.rts = rts 457 t.sh = sh 458 // State transite to txActive. 459 t.state = txActive 460 } 461 return err 462} 463 464// acquire implements txReadEnv.acquire. 465func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 466 if err := checkNestedTxn(ctx); err != nil { 467 return nil, nil, err 468 } 469 if t.singleUse { 470 return t.acquireSingleUse(ctx) 471 } 472 return t.acquireMultiUse(ctx) 473} 474 475func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 476 t.mu.Lock() 477 defer t.mu.Unlock() 478 switch t.state { 479 case txClosed: 480 // A closed single-use transaction can never be reused. 481 return nil, nil, errTxClosed() 482 case txNew: 483 t.state = txClosed 484 ts := &sppb.TransactionSelector{ 485 Selector: &sppb.TransactionSelector_SingleUse{ 486 SingleUse: &sppb.TransactionOptions{ 487 Mode: &sppb.TransactionOptions_ReadOnly_{ 488 ReadOnly: buildTransactionOptionsReadOnly(t.tb, true), 489 }, 490 }, 491 }, 492 } 493 sh, err := t.sp.take(ctx) 494 if err != nil { 495 return nil, nil, err 496 } 497 498 // Install session handle into t, which can be used for readonly 499 // operations later. 500 t.sh = sh 501 return sh, ts, nil 502 } 503 us := t.state 504 505 // SingleUse transaction should only be in either txNew state or txClosed 506 // state. 507 return nil, nil, errUnexpectedTxState(us) 508} 509 510func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 511 for { 512 t.mu.Lock() 513 switch t.state { 514 case txClosed: 515 t.mu.Unlock() 516 return nil, nil, errTxClosed() 517 case txNew: 518 // State transit to txInit so that no further TimestampBound change 519 // is accepted. 520 t.state = txInit 521 t.mu.Unlock() 522 continue 523 case txInit: 524 if t.tx != nil { 525 // Wait for a transaction ID to become ready. 526 txReadyOrClosed := t.txReadyOrClosed 527 t.mu.Unlock() 528 select { 529 case <-txReadyOrClosed: 530 // Need to check transaction state again. 531 continue 532 case <-ctx.Done(): 533 // The waiting for initialization is timeout, return error 534 // directly. 535 return nil, nil, errTxInitTimeout() 536 } 537 } 538 // Take the ownership of initializing the transaction. 539 t.tx = transactionID{} 540 t.mu.Unlock() 541 // Begin a read-only transaction. 542 // 543 // TODO: consider adding a transaction option which allow queries to 544 // initiate transactions by themselves. Note that this option might 545 // not be always good because the ID of the new transaction won't 546 // be ready till the query returns some data or completes. 547 if err := t.begin(ctx); err != nil { 548 return nil, nil, err 549 } 550 551 // If t.begin() succeeded, t.state should have been changed to 552 // txActive, so we can just continue here. 553 continue 554 case txActive: 555 sh := t.sh 556 ts := &sppb.TransactionSelector{ 557 Selector: &sppb.TransactionSelector_Id{ 558 Id: t.tx, 559 }, 560 } 561 t.mu.Unlock() 562 return sh, ts, nil 563 } 564 state := t.state 565 t.mu.Unlock() 566 return nil, nil, errUnexpectedTxState(state) 567 } 568} 569 570func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) { 571 t.mu.Lock() 572 defer t.mu.Unlock() 573 if t.rts.IsZero() { 574 t.rts = ts 575 } 576} 577 578// release implements txReadEnv.release. 579func (t *ReadOnlyTransaction) release(err error) { 580 t.mu.Lock() 581 sh := t.sh 582 t.mu.Unlock() 583 if sh != nil { // sh could be nil if t.acquire() fails. 584 if isSessionNotFoundError(err) { 585 sh.destroy() 586 } 587 if t.singleUse { 588 // If session handle is already destroyed, this becomes a noop. 589 sh.recycle() 590 } 591 } 592} 593 594// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads 595// after being closed. 596func (t *ReadOnlyTransaction) Close() { 597 if t.singleUse { 598 return 599 } 600 t.mu.Lock() 601 if t.state != txClosed { 602 t.state = txClosed 603 close(t.txReadyOrClosed) 604 } 605 sh := t.sh 606 t.mu.Unlock() 607 if sh == nil { 608 return 609 } 610 // If session handle is already destroyed, this becomes a noop. If there are 611 // still active queries and if the recycled session is reused before they 612 // complete, Cloud Spanner will cancel them on behalf of the new transaction 613 // on the session. 614 if sh != nil { 615 sh.recycle() 616 } 617} 618 619// Timestamp returns the timestamp chosen to perform reads and queries in this 620// transaction. The value can only be read after some read or query has either 621// returned some data or completed without returning any data. 622func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) { 623 t.mu.Lock() 624 defer t.mu.Unlock() 625 if t.rts.IsZero() { 626 return t.rts, errRtsUnavailable() 627 } 628 return t.rts, nil 629} 630 631// WithTimestampBound specifies the TimestampBound to use for read or query. 632// This can only be used before the first read or query is invoked. Note: 633// bounded staleness is not available with general ReadOnlyTransactions; use a 634// single-use ReadOnlyTransaction instead. 635// 636// The returned value is the ReadOnlyTransaction so calls can be chained. 637func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction { 638 t.mu.Lock() 639 defer t.mu.Unlock() 640 if t.state == txNew { 641 // Only allow to set TimestampBound before the first query. 642 t.tb = tb 643 } 644 return t 645} 646 647// ReadWriteTransaction provides a locking read-write transaction. 648// 649// This type of transaction is the only way to write data into Cloud Spanner; 650// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use 651// transactions internally. These transactions rely on pessimistic locking and, 652// if necessary, two-phase commit. Locking read-write transactions may abort, 653// requiring the application to retry. However, the interface exposed by 654// (*Client).ReadWriteTransaction eliminates the need for applications to write 655// retry loops explicitly. 656// 657// Locking transactions may be used to atomically read-modify-write data 658// anywhere in a database. This type of transaction is externally consistent. 659// 660// Clients should attempt to minimize the amount of time a transaction is 661// active. Faster transactions commit with higher probability and cause less 662// contention. Cloud Spanner attempts to keep read locks active as long as the 663// transaction continues to do reads. Long periods of inactivity at the client 664// may cause Cloud Spanner to release a transaction's locks and abort it. 665// 666// Reads performed within a transaction acquire locks on the data being 667// read. Writes can only be done at commit time, after all reads have been 668// completed. Conceptually, a read-write transaction consists of zero or more 669// reads or SQL queries followed by a commit. 670// 671// See (*Client).ReadWriteTransaction for an example. 672// 673// Semantics 674// 675// Cloud Spanner can commit the transaction if all read locks it acquired are 676// still valid at commit time, and it is able to acquire write locks for all 677// writes. Cloud Spanner can abort the transaction for any reason. If a commit 678// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has 679// not modified any user data in Cloud Spanner. 680// 681// Unless the transaction commits, Cloud Spanner makes no guarantees about how 682// long the transaction's locks were held for. It is an error to use Cloud 683// Spanner locks for any sort of mutual exclusion other than between Cloud 684// Spanner transactions themselves. 685// 686// Aborted transactions 687// 688// Application code does not need to retry explicitly; RunInTransaction will 689// automatically retry a transaction if an attempt results in an abort. The lock 690// priority of a transaction increases after each prior aborted transaction, 691// meaning that the next attempt has a slightly better chance of success than 692// before. 693// 694// Under some circumstances (e.g., many transactions attempting to modify the 695// same row(s)), a transaction can abort many times in a short period before 696// successfully committing. Thus, it is not a good idea to cap the number of 697// retries a transaction can attempt; instead, it is better to limit the total 698// amount of wall time spent retrying. 699// 700// Idle transactions 701// 702// A transaction is considered idle if it has no outstanding reads or SQL 703// queries and has not started a read or SQL query within the last 10 704// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't 705// hold on to locks indefinitely. In that case, the commit will fail with error 706// ABORTED. 707// 708// If this behavior is undesirable, periodically executing a simple SQL query 709// in the transaction (e.g., SELECT 1) prevents the transaction from becoming 710// idle. 711type ReadWriteTransaction struct { 712 // txReadOnly contains methods for performing transactional reads. 713 txReadOnly 714 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 715 // ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin() 716 // during the initialization of ReadWriteTransaction. 717 tx transactionID 718 // mu protects concurrent access to the internal states of 719 // ReadWriteTransaction. 720 mu sync.Mutex 721 // state is the current transaction status of the read-write transaction. 722 state txState 723 // wb is the set of buffered mutations waiting to be committed. 724 wb []*Mutation 725} 726 727// BufferWrite adds a list of mutations to the set of updates that will be 728// applied when the transaction is committed. It does not actually apply the 729// write until the transaction is committed, so the operation does not block. 730// The effects of the write won't be visible to any reads (including reads done 731// in the same transaction) until the transaction commits. 732// 733// See the example for Client.ReadWriteTransaction. 734func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error { 735 t.mu.Lock() 736 defer t.mu.Unlock() 737 if t.state == txClosed { 738 return errTxClosed() 739 } 740 if t.state != txActive { 741 return errUnexpectedTxState(t.state) 742 } 743 t.wb = append(t.wb, ms...) 744 return nil 745} 746 747// Update executes a DML statement against the database. It returns the number 748// of affected rows. Update returns an error if the statement is a query. 749// However, the query is executed, and any data read will be validated upon 750// commit. 751func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) { 752 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update") 753 defer func() { trace.EndSpan(ctx, err) }() 754 req, sh, err := t.prepareExecuteSQL(ctx, stmt, sppb.ExecuteSqlRequest_NORMAL) 755 if err != nil { 756 return 0, err 757 } 758 resultSet, err := sh.getClient().ExecuteSql(ctx, req) 759 if err != nil { 760 return 0, toSpannerError(err) 761 } 762 if resultSet.Stats == nil { 763 return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL) 764 } 765 return extractRowCount(resultSet.Stats) 766} 767 768// BatchUpdate groups one or more DML statements and sends them to Spanner in a 769// single RPC. This is an efficient way to execute multiple DML statements. 770// 771// A slice of counts is returned, where each count represents the number of 772// affected rows for the given query at the same index. If an error occurs, 773// counts will be returned up to the query that encountered the error. 774func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) { 775 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate") 776 defer func() { trace.EndSpan(ctx, err) }() 777 778 sh, ts, err := t.acquire(ctx) 779 if err != nil { 780 return nil, err 781 } 782 // Cloud Spanner will return "Session not found" on bad sessions. 783 sid := sh.getID() 784 if sid == "" { 785 // Might happen if transaction is closed in the middle of a API call. 786 return nil, errSessionClosed(sh) 787 } 788 789 var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement 790 for _, st := range stmts { 791 params, paramTypes, err := st.convertParams() 792 if err != nil { 793 return nil, err 794 } 795 sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{ 796 Sql: st.SQL, 797 Params: params, 798 ParamTypes: paramTypes, 799 }) 800 } 801 802 resp, err := sh.getClient().ExecuteBatchDml(ctx, &sppb.ExecuteBatchDmlRequest{ 803 Session: sh.getID(), 804 Transaction: ts, 805 Statements: sppbStmts, 806 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 807 }) 808 if err != nil { 809 return nil, toSpannerError(err) 810 } 811 812 var counts []int64 813 for _, rs := range resp.ResultSets { 814 count, err := extractRowCount(rs.Stats) 815 if err != nil { 816 return nil, err 817 } 818 counts = append(counts, count) 819 } 820 if resp.Status.Code != 0 { 821 return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message) 822 } 823 return counts, nil 824} 825 826// acquire implements txReadEnv.acquire. 827func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 828 ts := &sppb.TransactionSelector{ 829 Selector: &sppb.TransactionSelector_Id{ 830 Id: t.tx, 831 }, 832 } 833 t.mu.Lock() 834 defer t.mu.Unlock() 835 switch t.state { 836 case txClosed: 837 return nil, nil, errTxClosed() 838 case txActive: 839 return t.sh, ts, nil 840 } 841 return nil, nil, errUnexpectedTxState(t.state) 842} 843 844// release implements txReadEnv.release. 845func (t *ReadWriteTransaction) release(err error) { 846 t.mu.Lock() 847 sh := t.sh 848 t.mu.Unlock() 849 if sh != nil && isSessionNotFoundError(err) { 850 sh.destroy() 851 } 852} 853 854func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) { 855 res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{ 856 Session: sid, 857 Options: &sppb.TransactionOptions{ 858 Mode: &sppb.TransactionOptions_ReadWrite_{ 859 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 860 }, 861 }, 862 }) 863 if err != nil { 864 return nil, err 865 } 866 if res.Id == nil { 867 return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.") 868 } 869 return res.Id, nil 870} 871 872// begin starts a read-write transacton on Cloud Spanner, it is always called 873// before any of the public APIs. 874func (t *ReadWriteTransaction) begin(ctx context.Context) error { 875 if t.tx != nil { 876 t.state = txActive 877 return nil 878 } 879 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) 880 if err == nil { 881 t.tx = tx 882 t.state = txActive 883 return nil 884 } 885 if isSessionNotFoundError(err) { 886 t.sh.destroy() 887 } 888 return err 889} 890 891// commit tries to commit a readwrite transaction to Cloud Spanner. It also 892// returns the commit timestamp for the transactions. 893func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) { 894 var ts time.Time 895 t.mu.Lock() 896 t.state = txClosed // No further operations after commit. 897 mPb, err := mutationsProto(t.wb) 898 t.mu.Unlock() 899 if err != nil { 900 return ts, err 901 } 902 // In case that sessionHandle was destroyed but transaction body fails to 903 // report it. 904 sid, client := t.sh.getID(), t.sh.getClient() 905 if sid == "" || client == nil { 906 return ts, errSessionClosed(t.sh) 907 } 908 909 res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{ 910 Session: sid, 911 Transaction: &sppb.CommitRequest_TransactionId{ 912 TransactionId: t.tx, 913 }, 914 Mutations: mPb, 915 }) 916 if e != nil { 917 return ts, toSpannerErrorWithCommitInfo(e, true) 918 } 919 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 920 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 921 } 922 if isSessionNotFoundError(err) { 923 t.sh.destroy() 924 } 925 return ts, err 926} 927 928// rollback is called when a commit is aborted or the transaction body runs 929// into error. 930func (t *ReadWriteTransaction) rollback(ctx context.Context) { 931 t.mu.Lock() 932 // Forbid further operations on rollbacked transaction. 933 t.state = txClosed 934 t.mu.Unlock() 935 // In case that sessionHandle was destroyed but transaction body fails to 936 // report it. 937 sid, client := t.sh.getID(), t.sh.getClient() 938 if sid == "" || client == nil { 939 return 940 } 941 err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{ 942 Session: sid, 943 TransactionId: t.tx, 944 }) 945 if isSessionNotFoundError(err) { 946 t.sh.destroy() 947 } 948} 949 950// runInTransaction executes f under a read-write transaction context. 951func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) { 952 var ( 953 ts time.Time 954 err error 955 errDuringCommit bool 956 ) 957 if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil { 958 // Try to commit if transaction body returns no error. 959 ts, err = t.commit(ctx) 960 errDuringCommit = err != nil 961 } 962 if err != nil { 963 if isAbortErr(err) { 964 // Retry the transaction using the same session on ABORT error. 965 // Cloud Spanner will create the new transaction with the previous 966 // one's wound-wait priority. 967 return ts, err 968 } 969 if isSessionNotFoundError(err) { 970 t.sh.destroy() 971 return ts, err 972 } 973 // Rollback the transaction unless the error occurred during the 974 // commit. Executing a rollback after a commit has failed will 975 // otherwise cause an error. Note that transient errors, such as 976 // UNAVAILABLE, are already handled in the gRPC layer and do not show 977 // up here. Context errors (deadline exceeded / canceled) during 978 // commits are also not rolled back. 979 if !errDuringCommit { 980 t.rollback(ctx) 981 } 982 return ts, err 983 } 984 // err == nil, return commit timestamp. 985 return ts, nil 986} 987 988// writeOnlyTransaction provides the most efficient way of doing write-only 989// transactions. It essentially does blind writes to Cloud Spanner. 990type writeOnlyTransaction struct { 991 // sp is the session pool which writeOnlyTransaction uses to get Cloud 992 // Spanner sessions for blind writes. 993 sp *sessionPool 994} 995 996// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, 997// unless one of the following happens: 998// 999// 1) Context times out. 1000// 2) An unretryable error (e.g. database not found) occurs. 1001// 3) There is a malformed Mutation object. 1002func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) { 1003 var ( 1004 ts time.Time 1005 sh *sessionHandle 1006 ) 1007 mPb, err := mutationsProto(ms) 1008 if err != nil { 1009 // Malformed mutation found, just return the error. 1010 return ts, err 1011 } 1012 1013 // Retry-loop for aborted transactions. 1014 // TODO: Replace with generic retryer. 1015 for { 1016 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 1017 // No usable session for doing the commit, take one from pool. 1018 sh, err = t.sp.take(ctx) 1019 if err != nil { 1020 // sessionPool.Take already retries for session 1021 // creations/retrivals. 1022 return ts, err 1023 } 1024 defer sh.recycle() 1025 } 1026 res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ 1027 Session: sh.getID(), 1028 Transaction: &sppb.CommitRequest_SingleUseTransaction{ 1029 SingleUseTransaction: &sppb.TransactionOptions{ 1030 Mode: &sppb.TransactionOptions_ReadWrite_{ 1031 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 1032 }, 1033 }, 1034 }, 1035 Mutations: mPb, 1036 }) 1037 if err != nil && !isAbortErr(err) { 1038 if isSessionNotFoundError(err) { 1039 // Discard the bad session. 1040 sh.destroy() 1041 } 1042 return ts, toSpannerErrorWithCommitInfo(err, true) 1043 } else if err == nil { 1044 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 1045 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 1046 } 1047 break 1048 } 1049 } 1050 return ts, toSpannerError(err) 1051} 1052 1053// isAbortedErr returns true if the error indicates that an gRPC call is 1054// aborted on the server side. 1055func isAbortErr(err error) bool { 1056 if err == nil { 1057 return false 1058 } 1059 if ErrCode(err) == codes.Aborted { 1060 return true 1061 } 1062 return false 1063} 1064