1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "cloud.google.com/go/internal/trace" 26 vkit "cloud.google.com/go/spanner/apiv1" 27 "google.golang.org/api/iterator" 28 sppb "google.golang.org/genproto/googleapis/spanner/v1" 29 "google.golang.org/grpc/codes" 30) 31 32// transactionID stores a transaction ID which uniquely identifies a transaction 33// in Cloud Spanner. 34type transactionID []byte 35 36// txReadEnv manages a read-transaction environment consisting of a session 37// handle and a transaction selector. 38type txReadEnv interface { 39 // acquire returns a read-transaction environment that can be used to 40 // perform a transactional read. 41 acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) 42 // sets the transaction's read timestamp 43 setTimestamp(time.Time) 44 // release should be called at the end of every transactional read to deal 45 // with session recycling. 46 release(error) 47} 48 49// txReadOnly contains methods for doing transactional reads. 50type txReadOnly struct { 51 // read-transaction environment for performing transactional read 52 // operations. 53 txReadEnv 54 55 // Atomic. Only needed for DML statements, but used forall. 56 sequenceNumber int64 57 58 // replaceSessionFunc is a function that can be called to replace the 59 // session that is used by the transaction. This function should only be 60 // defined for single-use transactions that can safely be retried on a 61 // different session. All other transactions will set this function to nil. 62 replaceSessionFunc func(ctx context.Context) error 63 64 // sp is the session pool for allocating a session to execute the read-only 65 // transaction. It is set only once during initialization of the 66 // txReadOnly. 67 sp *sessionPool 68 // sh is the sessionHandle allocated from sp. 69 sh *sessionHandle 70 71 // qo provides options for executing a sql query. 72 qo QueryOptions 73} 74 75// errSessionClosed returns error for using a recycled/destroyed session 76func errSessionClosed(sh *sessionHandle) error { 77 return spannerErrorf(codes.FailedPrecondition, 78 "session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient()) 79} 80 81// Read returns a RowIterator for reading multiple rows from the database. 82func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator { 83 return t.ReadWithOptions(ctx, table, keys, columns, nil) 84} 85 86// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}. 87func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) { 88 return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index}) 89} 90 91// ReadOptions provides options for reading rows from a database. 92type ReadOptions struct { 93 // The index to use for reading. If non-empty, you can only read columns 94 // that are part of the index key, part of the primary key, or stored in the 95 // index due to a STORING clause in the index definition. 96 Index string 97 98 // The maximum number of rows to read. A limit value less than 1 means no 99 // limit. 100 Limit int 101} 102 103// ReadWithOptions returns a RowIterator for reading multiple rows from the 104// database. Pass a ReadOptions to modify the read operation. 105func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) { 106 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read") 107 defer func() { trace.EndSpan(ctx, ri.err) }() 108 var ( 109 sh *sessionHandle 110 ts *sppb.TransactionSelector 111 err error 112 ) 113 kset, err := keys.keySetProto() 114 if err != nil { 115 return &RowIterator{err: err} 116 } 117 if sh, ts, err = t.acquire(ctx); err != nil { 118 return &RowIterator{err: err} 119 } 120 // Cloud Spanner will return "Session not found" on bad sessions. 121 sid, client := sh.getID(), sh.getClient() 122 if sid == "" || client == nil { 123 // Might happen if transaction is closed in the middle of a API call. 124 return &RowIterator{err: errSessionClosed(sh)} 125 } 126 index := "" 127 limit := 0 128 if opts != nil { 129 index = opts.Index 130 if opts.Limit > 0 { 131 limit = opts.Limit 132 } 133 } 134 return stream( 135 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 136 sh.session.logger, 137 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 138 return client.StreamingRead(ctx, 139 &sppb.ReadRequest{ 140 Session: sid, 141 Transaction: ts, 142 Table: table, 143 Index: index, 144 Columns: columns, 145 KeySet: kset, 146 ResumeToken: resumeToken, 147 Limit: int64(limit), 148 }) 149 }, 150 t.setTimestamp, 151 t.release, 152 ) 153} 154 155// errRowNotFound returns error for not being able to read the row identified by 156// key. 157func errRowNotFound(table string, key Key) error { 158 return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key) 159} 160 161// errRowNotFoundByIndex returns error for not being able to read the row by index. 162func errRowNotFoundByIndex(table string, key Key, index string) error { 163 return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 164} 165 166// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index. 167func errMultipleRowsFound(table string, key Key, index string) error { 168 return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 169} 170 171// ReadRow reads a single row from the database. 172// 173// If no row is present with the given key, then ReadRow returns an error where 174// spanner.ErrCode(err) is codes.NotFound. 175func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) { 176 iter := t.Read(ctx, table, key, columns) 177 defer iter.Stop() 178 row, err := iter.Next() 179 switch err { 180 case iterator.Done: 181 return nil, errRowNotFound(table, key) 182 case nil: 183 return row, nil 184 default: 185 return nil, err 186 } 187} 188 189// ReadRowUsingIndex reads a single row from the database using an index. 190// 191// If no row is present with the given index, then ReadRowUsingIndex returns an 192// error where spanner.ErrCode(err) is codes.NotFound. 193// 194// If more than one row received with the given index, then ReadRowUsingIndex 195// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition. 196func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) { 197 iter := t.ReadUsingIndex(ctx, table, index, key, columns) 198 defer iter.Stop() 199 row, err := iter.Next() 200 switch err { 201 case iterator.Done: 202 return nil, errRowNotFoundByIndex(table, key, index) 203 case nil: 204 // If more than one row found, return an error. 205 _, err := iter.Next() 206 switch err { 207 case iterator.Done: 208 return row, nil 209 case nil: 210 return nil, errMultipleRowsFound(table, key, index) 211 default: 212 return nil, err 213 } 214 default: 215 return nil, err 216 } 217} 218 219// QueryOptions provides options for executing a sql query from a database. 220type QueryOptions struct { 221 Mode *sppb.ExecuteSqlRequest_QueryMode 222 Options *sppb.ExecuteSqlRequest_QueryOptions 223} 224 225// merge combines two QueryOptions that the input parameter will have higher 226// order of precedence. 227func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { 228 merged := QueryOptions{ 229 Mode: qo.Mode, 230 Options: &sppb.ExecuteSqlRequest_QueryOptions{}, 231 } 232 if opts.Mode != nil { 233 merged.Mode = opts.Mode 234 } 235 merged.Options.XXX_Merge(qo.Options) 236 merged.Options.XXX_Merge(opts.Options) 237 return merged 238} 239 240// Query executes a query against the database. It returns a RowIterator for 241// retrieving the resulting rows. 242// 243// Query returns only row data, without a query plan or execution statistics. 244// Use QueryWithStats to get rows along with the plan and statistics. Use 245// AnalyzeQuery to get just the plan. 246func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator { 247 mode := sppb.ExecuteSqlRequest_NORMAL 248 return t.query(ctx, statement, QueryOptions{ 249 Mode: &mode, 250 Options: t.qo.Options, 251 }) 252} 253 254// QueryWithOptions executes a SQL statment against the database. It returns 255// a RowIterator for retrieving the resulting rows. The sql query execution 256// will be optimized based on the given query options. 257func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, opts QueryOptions) *RowIterator { 258 return t.query(ctx, statement, t.qo.merge(opts)) 259} 260 261// QueryWithStats executes a SQL statement against the database. It returns 262// a RowIterator for retrieving the resulting rows. The RowIterator will also 263// be populated with a query plan and execution statistics. 264func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator { 265 mode := sppb.ExecuteSqlRequest_PROFILE 266 return t.query(ctx, statement, QueryOptions{ 267 Mode: &mode, 268 Options: t.qo.Options, 269 }) 270} 271 272// AnalyzeQuery returns the query plan for statement. 273func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) { 274 mode := sppb.ExecuteSqlRequest_PLAN 275 iter := t.query(ctx, statement, QueryOptions{ 276 Mode: &mode, 277 Options: t.qo.Options, 278 }) 279 defer iter.Stop() 280 for { 281 _, err := iter.Next() 282 if err == iterator.Done { 283 break 284 } 285 if err != nil { 286 return nil, err 287 } 288 } 289 if iter.QueryPlan == nil { 290 return nil, spannerErrorf(codes.Internal, "query plan unavailable") 291 } 292 return iter.QueryPlan, nil 293} 294 295func (t *txReadOnly) query(ctx context.Context, statement Statement, options QueryOptions) (ri *RowIterator) { 296 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query") 297 defer func() { trace.EndSpan(ctx, ri.err) }() 298 req, sh, err := t.prepareExecuteSQL(ctx, statement, options) 299 if err != nil { 300 return &RowIterator{err: err} 301 } 302 client := sh.getClient() 303 return streamWithReplaceSessionFunc( 304 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 305 sh.session.logger, 306 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 307 req.ResumeToken = resumeToken 308 req.Session = t.sh.getID() 309 return client.ExecuteStreamingSql(ctx, req) 310 }, 311 t.replaceSessionFunc, 312 t.setTimestamp, 313 t.release) 314} 315 316func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, options QueryOptions) (*sppb.ExecuteSqlRequest, *sessionHandle, error) { 317 sh, ts, err := t.acquire(ctx) 318 if err != nil { 319 return nil, nil, err 320 } 321 // Cloud Spanner will return "Session not found" on bad sessions. 322 sid := sh.getID() 323 if sid == "" { 324 // Might happen if transaction is closed in the middle of a API call. 325 return nil, nil, errSessionClosed(sh) 326 } 327 params, paramTypes, err := stmt.convertParams() 328 if err != nil { 329 return nil, nil, err 330 } 331 mode := sppb.ExecuteSqlRequest_NORMAL 332 if options.Mode != nil { 333 mode = *options.Mode 334 } 335 req := &sppb.ExecuteSqlRequest{ 336 Session: sid, 337 Transaction: ts, 338 Sql: stmt.SQL, 339 QueryMode: mode, 340 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 341 Params: params, 342 ParamTypes: paramTypes, 343 QueryOptions: options.Options, 344 } 345 return req, sh, nil 346} 347 348// txState is the status of a transaction. 349type txState int 350 351const ( 352 // transaction is new, waiting to be initialized.. 353 txNew txState = iota 354 // transaction is being initialized. 355 txInit 356 // transaction is active and can perform read/write. 357 txActive 358 // transaction is closed, cannot be used anymore. 359 txClosed 360) 361 362// errRtsUnavailable returns error for read transaction's read timestamp being 363// unavailable. 364func errRtsUnavailable() error { 365 return spannerErrorf(codes.Internal, "read timestamp is unavailable") 366} 367 368// errTxClosed returns error for using a closed transaction. 369func errTxClosed() error { 370 return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction") 371} 372 373// errUnexpectedTxState returns error for transaction enters an unexpected state. 374func errUnexpectedTxState(ts txState) error { 375 return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts) 376} 377 378// ReadOnlyTransaction provides a snapshot transaction with guaranteed 379// consistency across reads, but does not allow writes. Read-only transactions 380// can be configured to read at timestamps in the past. 381// 382// Read-only transactions do not take locks. Instead, they work by choosing a 383// Cloud Spanner timestamp, then executing all reads at that timestamp. Since 384// they do not acquire locks, they do not block concurrent read-write 385// transactions. 386// 387// Unlike locking read-write transactions, read-only transactions never abort. 388// They can fail if the chosen read timestamp is garbage collected; however, the 389// default garbage collection policy is generous enough that most applications 390// do not need to worry about this in practice. See the documentation of 391// TimestampBound for more details. 392// 393// A ReadOnlyTransaction consumes resources on the server until Close is called. 394type ReadOnlyTransaction struct { 395 // mu protects concurrent access to the internal states of ReadOnlyTransaction. 396 mu sync.Mutex 397 // txReadOnly contains methods for performing transactional reads. 398 txReadOnly 399 // singleUse indicates that the transaction can be used for only one read. 400 singleUse bool 401 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 402 // ReadOnlyTransaction. 403 tx transactionID 404 // txReadyOrClosed is for broadcasting that transaction ID has been returned 405 // by Cloud Spanner or that transaction is closed. 406 txReadyOrClosed chan struct{} 407 // state is the current transaction status of the ReadOnly transaction. 408 state txState 409 // rts is the read timestamp returned by transactional reads. 410 rts time.Time 411 // tb is the read staleness bound specification for transactional reads. 412 tb TimestampBound 413} 414 415// errTxInitTimeout returns error for timeout in waiting for initialization of 416// the transaction. 417func errTxInitTimeout() error { 418 return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization") 419} 420 421// getTimestampBound returns the read staleness bound specified for the 422// ReadOnlyTransaction. 423func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound { 424 t.mu.Lock() 425 defer t.mu.Unlock() 426 return t.tb 427} 428 429// begin starts a snapshot read-only Transaction on Cloud Spanner. 430func (t *ReadOnlyTransaction) begin(ctx context.Context) error { 431 var ( 432 locked bool 433 tx transactionID 434 rts time.Time 435 sh *sessionHandle 436 err error 437 res *sppb.Transaction 438 ) 439 defer func() { 440 if !locked { 441 t.mu.Lock() 442 // Not necessary, just to make it clear that t.mu is being held when 443 // locked == true. 444 locked = true 445 } 446 if t.state != txClosed { 447 // Signal other initialization routines. 448 close(t.txReadyOrClosed) 449 t.txReadyOrClosed = make(chan struct{}) 450 } 451 t.mu.Unlock() 452 if err != nil && sh != nil { 453 // Got a valid session handle, but failed to initialize transaction= 454 // on Cloud Spanner. 455 if isSessionNotFoundError(err) { 456 sh.destroy() 457 } 458 // If sh.destroy was already executed, this becomes a noop. 459 sh.recycle() 460 } 461 }() 462 // Retry the BeginTransaction call if a 'Session not found' is returned. 463 for { 464 sh, err = t.sp.take(ctx) 465 if err != nil { 466 return err 467 } 468 res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 469 Session: sh.getID(), 470 Options: &sppb.TransactionOptions{ 471 Mode: &sppb.TransactionOptions_ReadOnly_{ 472 ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true), 473 }, 474 }, 475 }) 476 if isSessionNotFoundError(err) { 477 sh.destroy() 478 continue 479 } else if err == nil { 480 tx = res.Id 481 if res.ReadTimestamp != nil { 482 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 483 } 484 } else { 485 err = toSpannerError(err) 486 } 487 break 488 } 489 t.mu.Lock() 490 491 // defer function will be executed with t.mu being held. 492 locked = true 493 494 // During the execution of t.begin(), t.Close() was invoked. 495 if t.state == txClosed { 496 return errSessionClosed(sh) 497 } 498 499 // If begin() fails, this allows other queries to take over the 500 // initialization. 501 t.tx = nil 502 if err == nil { 503 t.tx = tx 504 t.rts = rts 505 t.sh = sh 506 // State transite to txActive. 507 t.state = txActive 508 } 509 return err 510} 511 512// acquire implements txReadEnv.acquire. 513func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 514 if err := checkNestedTxn(ctx); err != nil { 515 return nil, nil, err 516 } 517 if t.singleUse { 518 return t.acquireSingleUse(ctx) 519 } 520 return t.acquireMultiUse(ctx) 521} 522 523func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 524 t.mu.Lock() 525 defer t.mu.Unlock() 526 switch t.state { 527 case txClosed: 528 // A closed single-use transaction can never be reused. 529 return nil, nil, errTxClosed() 530 case txNew: 531 t.state = txClosed 532 ts := &sppb.TransactionSelector{ 533 Selector: &sppb.TransactionSelector_SingleUse{ 534 SingleUse: &sppb.TransactionOptions{ 535 Mode: &sppb.TransactionOptions_ReadOnly_{ 536 ReadOnly: buildTransactionOptionsReadOnly(t.tb, true), 537 }, 538 }, 539 }, 540 } 541 sh, err := t.sp.take(ctx) 542 if err != nil { 543 return nil, nil, err 544 } 545 546 // Install session handle into t, which can be used for readonly 547 // operations later. 548 t.sh = sh 549 return sh, ts, nil 550 } 551 us := t.state 552 553 // SingleUse transaction should only be in either txNew state or txClosed 554 // state. 555 return nil, nil, errUnexpectedTxState(us) 556} 557 558func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 559 for { 560 t.mu.Lock() 561 switch t.state { 562 case txClosed: 563 t.mu.Unlock() 564 return nil, nil, errTxClosed() 565 case txNew: 566 // State transit to txInit so that no further TimestampBound change 567 // is accepted. 568 t.state = txInit 569 t.mu.Unlock() 570 continue 571 case txInit: 572 if t.tx != nil { 573 // Wait for a transaction ID to become ready. 574 txReadyOrClosed := t.txReadyOrClosed 575 t.mu.Unlock() 576 select { 577 case <-txReadyOrClosed: 578 // Need to check transaction state again. 579 continue 580 case <-ctx.Done(): 581 // The waiting for initialization is timeout, return error 582 // directly. 583 return nil, nil, errTxInitTimeout() 584 } 585 } 586 // Take the ownership of initializing the transaction. 587 t.tx = transactionID{} 588 t.mu.Unlock() 589 // Begin a read-only transaction. 590 // 591 // TODO: consider adding a transaction option which allow queries to 592 // initiate transactions by themselves. Note that this option might 593 // not be always good because the ID of the new transaction won't 594 // be ready till the query returns some data or completes. 595 if err := t.begin(ctx); err != nil { 596 return nil, nil, err 597 } 598 599 // If t.begin() succeeded, t.state should have been changed to 600 // txActive, so we can just continue here. 601 continue 602 case txActive: 603 sh := t.sh 604 ts := &sppb.TransactionSelector{ 605 Selector: &sppb.TransactionSelector_Id{ 606 Id: t.tx, 607 }, 608 } 609 t.mu.Unlock() 610 return sh, ts, nil 611 } 612 state := t.state 613 t.mu.Unlock() 614 return nil, nil, errUnexpectedTxState(state) 615 } 616} 617 618func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) { 619 t.mu.Lock() 620 defer t.mu.Unlock() 621 if t.rts.IsZero() { 622 t.rts = ts 623 } 624} 625 626// release implements txReadEnv.release. 627func (t *ReadOnlyTransaction) release(err error) { 628 t.mu.Lock() 629 sh := t.sh 630 t.mu.Unlock() 631 if sh != nil { // sh could be nil if t.acquire() fails. 632 if isSessionNotFoundError(err) { 633 sh.destroy() 634 } 635 if t.singleUse { 636 // If session handle is already destroyed, this becomes a noop. 637 sh.recycle() 638 } 639 } 640} 641 642// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads 643// after being closed. 644func (t *ReadOnlyTransaction) Close() { 645 if t.singleUse { 646 return 647 } 648 t.mu.Lock() 649 if t.state != txClosed { 650 t.state = txClosed 651 close(t.txReadyOrClosed) 652 } 653 sh := t.sh 654 t.mu.Unlock() 655 if sh == nil { 656 return 657 } 658 // If session handle is already destroyed, this becomes a noop. If there are 659 // still active queries and if the recycled session is reused before they 660 // complete, Cloud Spanner will cancel them on behalf of the new transaction 661 // on the session. 662 if sh != nil { 663 sh.recycle() 664 } 665} 666 667// Timestamp returns the timestamp chosen to perform reads and queries in this 668// transaction. The value can only be read after some read or query has either 669// returned some data or completed without returning any data. 670func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) { 671 t.mu.Lock() 672 defer t.mu.Unlock() 673 if t.rts.IsZero() { 674 return t.rts, errRtsUnavailable() 675 } 676 return t.rts, nil 677} 678 679// WithTimestampBound specifies the TimestampBound to use for read or query. 680// This can only be used before the first read or query is invoked. Note: 681// bounded staleness is not available with general ReadOnlyTransactions; use a 682// single-use ReadOnlyTransaction instead. 683// 684// The returned value is the ReadOnlyTransaction so calls can be chained. 685func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction { 686 t.mu.Lock() 687 defer t.mu.Unlock() 688 if t.state == txNew { 689 // Only allow to set TimestampBound before the first query. 690 t.tb = tb 691 } 692 return t 693} 694 695// ReadWriteTransaction provides a locking read-write transaction. 696// 697// This type of transaction is the only way to write data into Cloud Spanner; 698// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use 699// transactions internally. These transactions rely on pessimistic locking and, 700// if necessary, two-phase commit. Locking read-write transactions may abort, 701// requiring the application to retry. However, the interface exposed by 702// (*Client).ReadWriteTransaction eliminates the need for applications to write 703// retry loops explicitly. 704// 705// Locking transactions may be used to atomically read-modify-write data 706// anywhere in a database. This type of transaction is externally consistent. 707// 708// Clients should attempt to minimize the amount of time a transaction is 709// active. Faster transactions commit with higher probability and cause less 710// contention. Cloud Spanner attempts to keep read locks active as long as the 711// transaction continues to do reads. Long periods of inactivity at the client 712// may cause Cloud Spanner to release a transaction's locks and abort it. 713// 714// Reads performed within a transaction acquire locks on the data being 715// read. Writes can only be done at commit time, after all reads have been 716// completed. Conceptually, a read-write transaction consists of zero or more 717// reads or SQL queries followed by a commit. 718// 719// See (*Client).ReadWriteTransaction for an example. 720// 721// Semantics 722// 723// Cloud Spanner can commit the transaction if all read locks it acquired are 724// still valid at commit time, and it is able to acquire write locks for all 725// writes. Cloud Spanner can abort the transaction for any reason. If a commit 726// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has 727// not modified any user data in Cloud Spanner. 728// 729// Unless the transaction commits, Cloud Spanner makes no guarantees about how 730// long the transaction's locks were held for. It is an error to use Cloud 731// Spanner locks for any sort of mutual exclusion other than between Cloud 732// Spanner transactions themselves. 733// 734// Aborted transactions 735// 736// Application code does not need to retry explicitly; RunInTransaction will 737// automatically retry a transaction if an attempt results in an abort. The lock 738// priority of a transaction increases after each prior aborted transaction, 739// meaning that the next attempt has a slightly better chance of success than 740// before. 741// 742// Under some circumstances (e.g., many transactions attempting to modify the 743// same row(s)), a transaction can abort many times in a short period before 744// successfully committing. Thus, it is not a good idea to cap the number of 745// retries a transaction can attempt; instead, it is better to limit the total 746// amount of wall time spent retrying. 747// 748// Idle transactions 749// 750// A transaction is considered idle if it has no outstanding reads or SQL 751// queries and has not started a read or SQL query within the last 10 752// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't 753// hold on to locks indefinitely. In that case, the commit will fail with error 754// ABORTED. 755// 756// If this behavior is undesirable, periodically executing a simple SQL query 757// in the transaction (e.g., SELECT 1) prevents the transaction from becoming 758// idle. 759type ReadWriteTransaction struct { 760 // txReadOnly contains methods for performing transactional reads. 761 txReadOnly 762 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 763 // ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin() 764 // during the initialization of ReadWriteTransaction. 765 tx transactionID 766 // mu protects concurrent access to the internal states of 767 // ReadWriteTransaction. 768 mu sync.Mutex 769 // state is the current transaction status of the read-write transaction. 770 state txState 771 // wb is the set of buffered mutations waiting to be committed. 772 wb []*Mutation 773} 774 775// BufferWrite adds a list of mutations to the set of updates that will be 776// applied when the transaction is committed. It does not actually apply the 777// write until the transaction is committed, so the operation does not block. 778// The effects of the write won't be visible to any reads (including reads done 779// in the same transaction) until the transaction commits. 780// 781// See the example for Client.ReadWriteTransaction. 782func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error { 783 t.mu.Lock() 784 defer t.mu.Unlock() 785 if t.state == txClosed { 786 return errTxClosed() 787 } 788 if t.state != txActive { 789 return errUnexpectedTxState(t.state) 790 } 791 t.wb = append(t.wb, ms...) 792 return nil 793} 794 795// Update executes a DML statement against the database. It returns the number 796// of affected rows. Update returns an error if the statement is a query. 797// However, the query is executed, and any data read will be validated upon 798// commit. 799func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) { 800 mode := sppb.ExecuteSqlRequest_NORMAL 801 return t.update(ctx, stmt, QueryOptions{ 802 Mode: &mode, 803 Options: t.qo.Options, 804 }) 805} 806 807// UpdateWithOptions executes a DML statement against the database. It returns 808// the number of affected rows. The sql query execution will be optimized 809// based on the given query options. 810func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { 811 return t.update(ctx, stmt, t.qo.merge(opts)) 812} 813 814func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { 815 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update") 816 defer func() { trace.EndSpan(ctx, err) }() 817 req, sh, err := t.prepareExecuteSQL(ctx, stmt, opts) 818 if err != nil { 819 return 0, err 820 } 821 resultSet, err := sh.getClient().ExecuteSql(ctx, req) 822 if err != nil { 823 return 0, toSpannerError(err) 824 } 825 if resultSet.Stats == nil { 826 return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL) 827 } 828 return extractRowCount(resultSet.Stats) 829} 830 831// BatchUpdate groups one or more DML statements and sends them to Spanner in a 832// single RPC. This is an efficient way to execute multiple DML statements. 833// 834// A slice of counts is returned, where each count represents the number of 835// affected rows for the given query at the same index. If an error occurs, 836// counts will be returned up to the query that encountered the error. 837func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) { 838 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate") 839 defer func() { trace.EndSpan(ctx, err) }() 840 841 sh, ts, err := t.acquire(ctx) 842 if err != nil { 843 return nil, err 844 } 845 // Cloud Spanner will return "Session not found" on bad sessions. 846 sid := sh.getID() 847 if sid == "" { 848 // Might happen if transaction is closed in the middle of a API call. 849 return nil, errSessionClosed(sh) 850 } 851 852 var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement 853 for _, st := range stmts { 854 params, paramTypes, err := st.convertParams() 855 if err != nil { 856 return nil, err 857 } 858 sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{ 859 Sql: st.SQL, 860 Params: params, 861 ParamTypes: paramTypes, 862 }) 863 } 864 865 resp, err := sh.getClient().ExecuteBatchDml(ctx, &sppb.ExecuteBatchDmlRequest{ 866 Session: sh.getID(), 867 Transaction: ts, 868 Statements: sppbStmts, 869 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 870 }) 871 if err != nil { 872 return nil, toSpannerError(err) 873 } 874 875 var counts []int64 876 for _, rs := range resp.ResultSets { 877 count, err := extractRowCount(rs.Stats) 878 if err != nil { 879 return nil, err 880 } 881 counts = append(counts, count) 882 } 883 if resp.Status.Code != 0 { 884 return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message) 885 } 886 return counts, nil 887} 888 889// acquire implements txReadEnv.acquire. 890func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 891 ts := &sppb.TransactionSelector{ 892 Selector: &sppb.TransactionSelector_Id{ 893 Id: t.tx, 894 }, 895 } 896 t.mu.Lock() 897 defer t.mu.Unlock() 898 switch t.state { 899 case txClosed: 900 return nil, nil, errTxClosed() 901 case txActive: 902 return t.sh, ts, nil 903 } 904 return nil, nil, errUnexpectedTxState(t.state) 905} 906 907// release implements txReadEnv.release. 908func (t *ReadWriteTransaction) release(err error) { 909 t.mu.Lock() 910 sh := t.sh 911 t.mu.Unlock() 912 if sh != nil && isSessionNotFoundError(err) { 913 sh.destroy() 914 } 915} 916 917func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) { 918 res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{ 919 Session: sid, 920 Options: &sppb.TransactionOptions{ 921 Mode: &sppb.TransactionOptions_ReadWrite_{ 922 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 923 }, 924 }, 925 }) 926 if err != nil { 927 return nil, err 928 } 929 if res.Id == nil { 930 return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.") 931 } 932 return res.Id, nil 933} 934 935// begin starts a read-write transacton on Cloud Spanner, it is always called 936// before any of the public APIs. 937func (t *ReadWriteTransaction) begin(ctx context.Context) error { 938 if t.tx != nil { 939 t.state = txActive 940 return nil 941 } 942 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) 943 if err == nil { 944 t.tx = tx 945 t.state = txActive 946 return nil 947 } 948 if isSessionNotFoundError(err) { 949 t.sh.destroy() 950 } 951 return err 952} 953 954// commit tries to commit a readwrite transaction to Cloud Spanner. It also 955// returns the commit timestamp for the transactions. 956func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) { 957 var ts time.Time 958 t.mu.Lock() 959 t.state = txClosed // No further operations after commit. 960 mPb, err := mutationsProto(t.wb) 961 t.mu.Unlock() 962 if err != nil { 963 return ts, err 964 } 965 // In case that sessionHandle was destroyed but transaction body fails to 966 // report it. 967 sid, client := t.sh.getID(), t.sh.getClient() 968 if sid == "" || client == nil { 969 return ts, errSessionClosed(t.sh) 970 } 971 972 res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{ 973 Session: sid, 974 Transaction: &sppb.CommitRequest_TransactionId{ 975 TransactionId: t.tx, 976 }, 977 Mutations: mPb, 978 }) 979 if e != nil { 980 return ts, toSpannerErrorWithCommitInfo(e, true) 981 } 982 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 983 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 984 } 985 if isSessionNotFoundError(err) { 986 t.sh.destroy() 987 } 988 return ts, err 989} 990 991// rollback is called when a commit is aborted or the transaction body runs 992// into error. 993func (t *ReadWriteTransaction) rollback(ctx context.Context) { 994 t.mu.Lock() 995 // Forbid further operations on rollbacked transaction. 996 t.state = txClosed 997 t.mu.Unlock() 998 // In case that sessionHandle was destroyed but transaction body fails to 999 // report it. 1000 sid, client := t.sh.getID(), t.sh.getClient() 1001 if sid == "" || client == nil { 1002 return 1003 } 1004 err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{ 1005 Session: sid, 1006 TransactionId: t.tx, 1007 }) 1008 if isSessionNotFoundError(err) { 1009 t.sh.destroy() 1010 } 1011} 1012 1013// runInTransaction executes f under a read-write transaction context. 1014func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) { 1015 var ( 1016 ts time.Time 1017 err error 1018 errDuringCommit bool 1019 ) 1020 if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil { 1021 // Try to commit if transaction body returns no error. 1022 ts, err = t.commit(ctx) 1023 errDuringCommit = err != nil 1024 } 1025 if err != nil { 1026 if isAbortErr(err) { 1027 // Retry the transaction using the same session on ABORT error. 1028 // Cloud Spanner will create the new transaction with the previous 1029 // one's wound-wait priority. 1030 return ts, err 1031 } 1032 if isSessionNotFoundError(err) { 1033 t.sh.destroy() 1034 return ts, err 1035 } 1036 // Rollback the transaction unless the error occurred during the 1037 // commit. Executing a rollback after a commit has failed will 1038 // otherwise cause an error. Note that transient errors, such as 1039 // UNAVAILABLE, are already handled in the gRPC layer and do not show 1040 // up here. Context errors (deadline exceeded / canceled) during 1041 // commits are also not rolled back. 1042 if !errDuringCommit { 1043 t.rollback(ctx) 1044 } 1045 return ts, err 1046 } 1047 // err == nil, return commit timestamp. 1048 return ts, nil 1049} 1050 1051// writeOnlyTransaction provides the most efficient way of doing write-only 1052// transactions. It essentially does blind writes to Cloud Spanner. 1053type writeOnlyTransaction struct { 1054 // sp is the session pool which writeOnlyTransaction uses to get Cloud 1055 // Spanner sessions for blind writes. 1056 sp *sessionPool 1057} 1058 1059// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, 1060// unless one of the following happens: 1061// 1062// 1) Context times out. 1063// 2) An unretryable error (e.g. database not found) occurs. 1064// 3) There is a malformed Mutation object. 1065func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) { 1066 var ( 1067 ts time.Time 1068 sh *sessionHandle 1069 ) 1070 mPb, err := mutationsProto(ms) 1071 if err != nil { 1072 // Malformed mutation found, just return the error. 1073 return ts, err 1074 } 1075 1076 // Retry-loop for aborted transactions. 1077 // TODO: Replace with generic retryer. 1078 for { 1079 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 1080 // No usable session for doing the commit, take one from pool. 1081 sh, err = t.sp.take(ctx) 1082 if err != nil { 1083 // sessionPool.Take already retries for session 1084 // creations/retrivals. 1085 return ts, err 1086 } 1087 defer sh.recycle() 1088 } 1089 res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ 1090 Session: sh.getID(), 1091 Transaction: &sppb.CommitRequest_SingleUseTransaction{ 1092 SingleUseTransaction: &sppb.TransactionOptions{ 1093 Mode: &sppb.TransactionOptions_ReadWrite_{ 1094 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 1095 }, 1096 }, 1097 }, 1098 Mutations: mPb, 1099 }) 1100 if err != nil && !isAbortErr(err) { 1101 if isSessionNotFoundError(err) { 1102 // Discard the bad session. 1103 sh.destroy() 1104 } 1105 return ts, toSpannerErrorWithCommitInfo(err, true) 1106 } else if err == nil { 1107 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 1108 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 1109 } 1110 break 1111 } 1112 } 1113 return ts, toSpannerError(err) 1114} 1115 1116// isAbortedErr returns true if the error indicates that an gRPC call is 1117// aborted on the server side. 1118func isAbortErr(err error) bool { 1119 if err == nil { 1120 return false 1121 } 1122 if ErrCode(err) == codes.Aborted { 1123 return true 1124 } 1125 return false 1126} 1127