1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "cloud.google.com/go/internal/trace" 26 vkit "cloud.google.com/go/spanner/apiv1" 27 "github.com/golang/protobuf/proto" 28 "google.golang.org/api/iterator" 29 sppb "google.golang.org/genproto/googleapis/spanner/v1" 30 "google.golang.org/grpc/codes" 31) 32 33// transactionID stores a transaction ID which uniquely identifies a transaction 34// in Cloud Spanner. 35type transactionID []byte 36 37// txReadEnv manages a read-transaction environment consisting of a session 38// handle and a transaction selector. 39type txReadEnv interface { 40 // acquire returns a read-transaction environment that can be used to 41 // perform a transactional read. 42 acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) 43 // sets the transaction's read timestamp 44 setTimestamp(time.Time) 45 // release should be called at the end of every transactional read to deal 46 // with session recycling. 47 release(error) 48} 49 50// txReadOnly contains methods for doing transactional reads. 51type txReadOnly struct { 52 // read-transaction environment for performing transactional read 53 // operations. 54 txReadEnv 55 56 // Atomic. Only needed for DML statements, but used forall. 57 sequenceNumber int64 58 59 // replaceSessionFunc is a function that can be called to replace the 60 // session that is used by the transaction. This function should only be 61 // defined for single-use transactions that can safely be retried on a 62 // different session. All other transactions will set this function to nil. 63 replaceSessionFunc func(ctx context.Context) error 64 65 // sp is the session pool for allocating a session to execute the read-only 66 // transaction. It is set only once during initialization of the 67 // txReadOnly. 68 sp *sessionPool 69 // sh is the sessionHandle allocated from sp. 70 sh *sessionHandle 71 72 // qo provides options for executing a sql query. 73 qo QueryOptions 74} 75 76// errSessionClosed returns error for using a recycled/destroyed session 77func errSessionClosed(sh *sessionHandle) error { 78 return spannerErrorf(codes.FailedPrecondition, 79 "session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient()) 80} 81 82// Read returns a RowIterator for reading multiple rows from the database. 83func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator { 84 return t.ReadWithOptions(ctx, table, keys, columns, nil) 85} 86 87// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}. 88func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) { 89 return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index}) 90} 91 92// ReadOptions provides options for reading rows from a database. 93type ReadOptions struct { 94 // The index to use for reading. If non-empty, you can only read columns 95 // that are part of the index key, part of the primary key, or stored in the 96 // index due to a STORING clause in the index definition. 97 Index string 98 99 // The maximum number of rows to read. A limit value less than 1 means no 100 // limit. 101 Limit int 102} 103 104// ReadWithOptions returns a RowIterator for reading multiple rows from the 105// database. Pass a ReadOptions to modify the read operation. 106func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) { 107 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read") 108 defer func() { trace.EndSpan(ctx, ri.err) }() 109 var ( 110 sh *sessionHandle 111 ts *sppb.TransactionSelector 112 err error 113 ) 114 kset, err := keys.keySetProto() 115 if err != nil { 116 return &RowIterator{err: err} 117 } 118 if sh, ts, err = t.acquire(ctx); err != nil { 119 return &RowIterator{err: err} 120 } 121 // Cloud Spanner will return "Session not found" on bad sessions. 122 sid, client := sh.getID(), sh.getClient() 123 if sid == "" || client == nil { 124 // Might happen if transaction is closed in the middle of a API call. 125 return &RowIterator{err: errSessionClosed(sh)} 126 } 127 index := "" 128 limit := 0 129 if opts != nil { 130 index = opts.Index 131 if opts.Limit > 0 { 132 limit = opts.Limit 133 } 134 } 135 return stream( 136 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 137 sh.session.logger, 138 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 139 return client.StreamingRead(ctx, 140 &sppb.ReadRequest{ 141 Session: sid, 142 Transaction: ts, 143 Table: table, 144 Index: index, 145 Columns: columns, 146 KeySet: kset, 147 ResumeToken: resumeToken, 148 Limit: int64(limit), 149 }) 150 }, 151 t.setTimestamp, 152 t.release, 153 ) 154} 155 156// errRowNotFound returns error for not being able to read the row identified by 157// key. 158func errRowNotFound(table string, key Key) error { 159 return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key) 160} 161 162// errRowNotFoundByIndex returns error for not being able to read the row by index. 163func errRowNotFoundByIndex(table string, key Key, index string) error { 164 return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 165} 166 167// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index. 168func errMultipleRowsFound(table string, key Key, index string) error { 169 return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 170} 171 172// ReadRow reads a single row from the database. 173// 174// If no row is present with the given key, then ReadRow returns an error where 175// spanner.ErrCode(err) is codes.NotFound. 176func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) { 177 iter := t.Read(ctx, table, key, columns) 178 defer iter.Stop() 179 row, err := iter.Next() 180 switch err { 181 case iterator.Done: 182 return nil, errRowNotFound(table, key) 183 case nil: 184 return row, nil 185 default: 186 return nil, err 187 } 188} 189 190// ReadRowUsingIndex reads a single row from the database using an index. 191// 192// If no row is present with the given index, then ReadRowUsingIndex returns an 193// error where spanner.ErrCode(err) is codes.NotFound. 194// 195// If more than one row received with the given index, then ReadRowUsingIndex 196// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition. 197func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) { 198 iter := t.ReadUsingIndex(ctx, table, index, key, columns) 199 defer iter.Stop() 200 row, err := iter.Next() 201 switch err { 202 case iterator.Done: 203 return nil, errRowNotFoundByIndex(table, key, index) 204 case nil: 205 // If more than one row found, return an error. 206 _, err := iter.Next() 207 switch err { 208 case iterator.Done: 209 return row, nil 210 case nil: 211 return nil, errMultipleRowsFound(table, key, index) 212 default: 213 return nil, err 214 } 215 default: 216 return nil, err 217 } 218} 219 220// QueryOptions provides options for executing a sql query from a database. 221type QueryOptions struct { 222 Mode *sppb.ExecuteSqlRequest_QueryMode 223 Options *sppb.ExecuteSqlRequest_QueryOptions 224} 225 226// merge combines two QueryOptions that the input parameter will have higher 227// order of precedence. 228func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { 229 merged := QueryOptions{ 230 Mode: qo.Mode, 231 Options: &sppb.ExecuteSqlRequest_QueryOptions{}, 232 } 233 if opts.Mode != nil { 234 merged.Mode = opts.Mode 235 } 236 proto.Merge(merged.Options, qo.Options) 237 proto.Merge(merged.Options, opts.Options) 238 return merged 239} 240 241// Query executes a query against the database. It returns a RowIterator for 242// retrieving the resulting rows. 243// 244// Query returns only row data, without a query plan or execution statistics. 245// Use QueryWithStats to get rows along with the plan and statistics. Use 246// AnalyzeQuery to get just the plan. 247func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator { 248 mode := sppb.ExecuteSqlRequest_NORMAL 249 return t.query(ctx, statement, QueryOptions{ 250 Mode: &mode, 251 Options: t.qo.Options, 252 }) 253} 254 255// QueryWithOptions executes a SQL statment against the database. It returns 256// a RowIterator for retrieving the resulting rows. The sql query execution 257// will be optimized based on the given query options. 258func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, opts QueryOptions) *RowIterator { 259 return t.query(ctx, statement, t.qo.merge(opts)) 260} 261 262// QueryWithStats executes a SQL statement against the database. It returns 263// a RowIterator for retrieving the resulting rows. The RowIterator will also 264// be populated with a query plan and execution statistics. 265func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator { 266 mode := sppb.ExecuteSqlRequest_PROFILE 267 return t.query(ctx, statement, QueryOptions{ 268 Mode: &mode, 269 Options: t.qo.Options, 270 }) 271} 272 273// AnalyzeQuery returns the query plan for statement. 274func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) { 275 mode := sppb.ExecuteSqlRequest_PLAN 276 iter := t.query(ctx, statement, QueryOptions{ 277 Mode: &mode, 278 Options: t.qo.Options, 279 }) 280 defer iter.Stop() 281 for { 282 _, err := iter.Next() 283 if err == iterator.Done { 284 break 285 } 286 if err != nil { 287 return nil, err 288 } 289 } 290 if iter.QueryPlan == nil { 291 return nil, spannerErrorf(codes.Internal, "query plan unavailable") 292 } 293 return iter.QueryPlan, nil 294} 295 296func (t *txReadOnly) query(ctx context.Context, statement Statement, options QueryOptions) (ri *RowIterator) { 297 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query") 298 defer func() { trace.EndSpan(ctx, ri.err) }() 299 req, sh, err := t.prepareExecuteSQL(ctx, statement, options) 300 if err != nil { 301 return &RowIterator{err: err} 302 } 303 client := sh.getClient() 304 return streamWithReplaceSessionFunc( 305 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 306 sh.session.logger, 307 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 308 req.ResumeToken = resumeToken 309 req.Session = t.sh.getID() 310 return client.ExecuteStreamingSql(ctx, req) 311 }, 312 t.replaceSessionFunc, 313 t.setTimestamp, 314 t.release) 315} 316 317func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, options QueryOptions) (*sppb.ExecuteSqlRequest, *sessionHandle, error) { 318 sh, ts, err := t.acquire(ctx) 319 if err != nil { 320 return nil, nil, err 321 } 322 // Cloud Spanner will return "Session not found" on bad sessions. 323 sid := sh.getID() 324 if sid == "" { 325 // Might happen if transaction is closed in the middle of a API call. 326 return nil, nil, errSessionClosed(sh) 327 } 328 params, paramTypes, err := stmt.convertParams() 329 if err != nil { 330 return nil, nil, err 331 } 332 mode := sppb.ExecuteSqlRequest_NORMAL 333 if options.Mode != nil { 334 mode = *options.Mode 335 } 336 req := &sppb.ExecuteSqlRequest{ 337 Session: sid, 338 Transaction: ts, 339 Sql: stmt.SQL, 340 QueryMode: mode, 341 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 342 Params: params, 343 ParamTypes: paramTypes, 344 QueryOptions: options.Options, 345 } 346 return req, sh, nil 347} 348 349// txState is the status of a transaction. 350type txState int 351 352const ( 353 // transaction is new, waiting to be initialized.. 354 txNew txState = iota 355 // transaction is being initialized. 356 txInit 357 // transaction is active and can perform read/write. 358 txActive 359 // transaction is closed, cannot be used anymore. 360 txClosed 361) 362 363// errRtsUnavailable returns error for read transaction's read timestamp being 364// unavailable. 365func errRtsUnavailable() error { 366 return spannerErrorf(codes.Internal, "read timestamp is unavailable") 367} 368 369// errTxClosed returns error for using a closed transaction. 370func errTxClosed() error { 371 return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction") 372} 373 374// errUnexpectedTxState returns error for transaction enters an unexpected state. 375func errUnexpectedTxState(ts txState) error { 376 return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts) 377} 378 379// ReadOnlyTransaction provides a snapshot transaction with guaranteed 380// consistency across reads, but does not allow writes. Read-only transactions 381// can be configured to read at timestamps in the past. 382// 383// Read-only transactions do not take locks. Instead, they work by choosing a 384// Cloud Spanner timestamp, then executing all reads at that timestamp. Since 385// they do not acquire locks, they do not block concurrent read-write 386// transactions. 387// 388// Unlike locking read-write transactions, read-only transactions never abort. 389// They can fail if the chosen read timestamp is garbage collected; however, the 390// default garbage collection policy is generous enough that most applications 391// do not need to worry about this in practice. See the documentation of 392// TimestampBound for more details. 393// 394// A ReadOnlyTransaction consumes resources on the server until Close is called. 395type ReadOnlyTransaction struct { 396 // mu protects concurrent access to the internal states of ReadOnlyTransaction. 397 mu sync.Mutex 398 // txReadOnly contains methods for performing transactional reads. 399 txReadOnly 400 // singleUse indicates that the transaction can be used for only one read. 401 singleUse bool 402 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 403 // ReadOnlyTransaction. 404 tx transactionID 405 // txReadyOrClosed is for broadcasting that transaction ID has been returned 406 // by Cloud Spanner or that transaction is closed. 407 txReadyOrClosed chan struct{} 408 // state is the current transaction status of the ReadOnly transaction. 409 state txState 410 // rts is the read timestamp returned by transactional reads. 411 rts time.Time 412 // tb is the read staleness bound specification for transactional reads. 413 tb TimestampBound 414} 415 416// errTxInitTimeout returns error for timeout in waiting for initialization of 417// the transaction. 418func errTxInitTimeout() error { 419 return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization") 420} 421 422// getTimestampBound returns the read staleness bound specified for the 423// ReadOnlyTransaction. 424func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound { 425 t.mu.Lock() 426 defer t.mu.Unlock() 427 return t.tb 428} 429 430// begin starts a snapshot read-only Transaction on Cloud Spanner. 431func (t *ReadOnlyTransaction) begin(ctx context.Context) error { 432 var ( 433 locked bool 434 tx transactionID 435 rts time.Time 436 sh *sessionHandle 437 err error 438 res *sppb.Transaction 439 ) 440 defer func() { 441 if !locked { 442 t.mu.Lock() 443 // Not necessary, just to make it clear that t.mu is being held when 444 // locked == true. 445 locked = true 446 } 447 if t.state != txClosed { 448 // Signal other initialization routines. 449 close(t.txReadyOrClosed) 450 t.txReadyOrClosed = make(chan struct{}) 451 } 452 t.mu.Unlock() 453 if err != nil && sh != nil { 454 // Got a valid session handle, but failed to initialize transaction= 455 // on Cloud Spanner. 456 if isSessionNotFoundError(err) { 457 sh.destroy() 458 } 459 // If sh.destroy was already executed, this becomes a noop. 460 sh.recycle() 461 } 462 }() 463 // Retry the BeginTransaction call if a 'Session not found' is returned. 464 for { 465 sh, err = t.sp.take(ctx) 466 if err != nil { 467 return err 468 } 469 res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 470 Session: sh.getID(), 471 Options: &sppb.TransactionOptions{ 472 Mode: &sppb.TransactionOptions_ReadOnly_{ 473 ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true), 474 }, 475 }, 476 }) 477 if isSessionNotFoundError(err) { 478 sh.destroy() 479 continue 480 } else if err == nil { 481 tx = res.Id 482 if res.ReadTimestamp != nil { 483 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 484 } 485 } else { 486 err = toSpannerError(err) 487 } 488 break 489 } 490 t.mu.Lock() 491 492 // defer function will be executed with t.mu being held. 493 locked = true 494 495 // During the execution of t.begin(), t.Close() was invoked. 496 if t.state == txClosed { 497 return errSessionClosed(sh) 498 } 499 500 // If begin() fails, this allows other queries to take over the 501 // initialization. 502 t.tx = nil 503 if err == nil { 504 t.tx = tx 505 t.rts = rts 506 t.sh = sh 507 // State transite to txActive. 508 t.state = txActive 509 } 510 return err 511} 512 513// acquire implements txReadEnv.acquire. 514func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 515 if err := checkNestedTxn(ctx); err != nil { 516 return nil, nil, err 517 } 518 if t.singleUse { 519 return t.acquireSingleUse(ctx) 520 } 521 return t.acquireMultiUse(ctx) 522} 523 524func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 525 t.mu.Lock() 526 defer t.mu.Unlock() 527 switch t.state { 528 case txClosed: 529 // A closed single-use transaction can never be reused. 530 return nil, nil, errTxClosed() 531 case txNew: 532 t.state = txClosed 533 ts := &sppb.TransactionSelector{ 534 Selector: &sppb.TransactionSelector_SingleUse{ 535 SingleUse: &sppb.TransactionOptions{ 536 Mode: &sppb.TransactionOptions_ReadOnly_{ 537 ReadOnly: buildTransactionOptionsReadOnly(t.tb, true), 538 }, 539 }, 540 }, 541 } 542 sh, err := t.sp.take(ctx) 543 if err != nil { 544 return nil, nil, err 545 } 546 547 // Install session handle into t, which can be used for readonly 548 // operations later. 549 t.sh = sh 550 return sh, ts, nil 551 } 552 us := t.state 553 554 // SingleUse transaction should only be in either txNew state or txClosed 555 // state. 556 return nil, nil, errUnexpectedTxState(us) 557} 558 559func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 560 for { 561 t.mu.Lock() 562 switch t.state { 563 case txClosed: 564 t.mu.Unlock() 565 return nil, nil, errTxClosed() 566 case txNew: 567 // State transit to txInit so that no further TimestampBound change 568 // is accepted. 569 t.state = txInit 570 t.mu.Unlock() 571 continue 572 case txInit: 573 if t.tx != nil { 574 // Wait for a transaction ID to become ready. 575 txReadyOrClosed := t.txReadyOrClosed 576 t.mu.Unlock() 577 select { 578 case <-txReadyOrClosed: 579 // Need to check transaction state again. 580 continue 581 case <-ctx.Done(): 582 // The waiting for initialization is timeout, return error 583 // directly. 584 return nil, nil, errTxInitTimeout() 585 } 586 } 587 // Take the ownership of initializing the transaction. 588 t.tx = transactionID{} 589 t.mu.Unlock() 590 // Begin a read-only transaction. 591 // 592 // TODO: consider adding a transaction option which allow queries to 593 // initiate transactions by themselves. Note that this option might 594 // not be always good because the ID of the new transaction won't 595 // be ready till the query returns some data or completes. 596 if err := t.begin(ctx); err != nil { 597 return nil, nil, err 598 } 599 600 // If t.begin() succeeded, t.state should have been changed to 601 // txActive, so we can just continue here. 602 continue 603 case txActive: 604 sh := t.sh 605 ts := &sppb.TransactionSelector{ 606 Selector: &sppb.TransactionSelector_Id{ 607 Id: t.tx, 608 }, 609 } 610 t.mu.Unlock() 611 return sh, ts, nil 612 } 613 state := t.state 614 t.mu.Unlock() 615 return nil, nil, errUnexpectedTxState(state) 616 } 617} 618 619func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) { 620 t.mu.Lock() 621 defer t.mu.Unlock() 622 if t.rts.IsZero() { 623 t.rts = ts 624 } 625} 626 627// release implements txReadEnv.release. 628func (t *ReadOnlyTransaction) release(err error) { 629 t.mu.Lock() 630 sh := t.sh 631 t.mu.Unlock() 632 if sh != nil { // sh could be nil if t.acquire() fails. 633 if isSessionNotFoundError(err) { 634 sh.destroy() 635 } 636 if t.singleUse { 637 // If session handle is already destroyed, this becomes a noop. 638 sh.recycle() 639 } 640 } 641} 642 643// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads 644// after being closed. 645func (t *ReadOnlyTransaction) Close() { 646 if t.singleUse { 647 return 648 } 649 t.mu.Lock() 650 if t.state != txClosed { 651 t.state = txClosed 652 close(t.txReadyOrClosed) 653 } 654 sh := t.sh 655 t.mu.Unlock() 656 if sh == nil { 657 return 658 } 659 // If session handle is already destroyed, this becomes a noop. If there are 660 // still active queries and if the recycled session is reused before they 661 // complete, Cloud Spanner will cancel them on behalf of the new transaction 662 // on the session. 663 if sh != nil { 664 sh.recycle() 665 } 666} 667 668// Timestamp returns the timestamp chosen to perform reads and queries in this 669// transaction. The value can only be read after some read or query has either 670// returned some data or completed without returning any data. 671func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) { 672 t.mu.Lock() 673 defer t.mu.Unlock() 674 if t.rts.IsZero() { 675 return t.rts, errRtsUnavailable() 676 } 677 return t.rts, nil 678} 679 680// WithTimestampBound specifies the TimestampBound to use for read or query. 681// This can only be used before the first read or query is invoked. Note: 682// bounded staleness is not available with general ReadOnlyTransactions; use a 683// single-use ReadOnlyTransaction instead. 684// 685// The returned value is the ReadOnlyTransaction so calls can be chained. 686func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction { 687 t.mu.Lock() 688 defer t.mu.Unlock() 689 if t.state == txNew { 690 // Only allow to set TimestampBound before the first query. 691 t.tb = tb 692 } 693 return t 694} 695 696// ReadWriteTransaction provides a locking read-write transaction. 697// 698// This type of transaction is the only way to write data into Cloud Spanner; 699// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use 700// transactions internally. These transactions rely on pessimistic locking and, 701// if necessary, two-phase commit. Locking read-write transactions may abort, 702// requiring the application to retry. However, the interface exposed by 703// (*Client).ReadWriteTransaction eliminates the need for applications to write 704// retry loops explicitly. 705// 706// Locking transactions may be used to atomically read-modify-write data 707// anywhere in a database. This type of transaction is externally consistent. 708// 709// Clients should attempt to minimize the amount of time a transaction is 710// active. Faster transactions commit with higher probability and cause less 711// contention. Cloud Spanner attempts to keep read locks active as long as the 712// transaction continues to do reads. Long periods of inactivity at the client 713// may cause Cloud Spanner to release a transaction's locks and abort it. 714// 715// Reads performed within a transaction acquire locks on the data being 716// read. Writes can only be done at commit time, after all reads have been 717// completed. Conceptually, a read-write transaction consists of zero or more 718// reads or SQL queries followed by a commit. 719// 720// See (*Client).ReadWriteTransaction for an example. 721// 722// Semantics 723// 724// Cloud Spanner can commit the transaction if all read locks it acquired are 725// still valid at commit time, and it is able to acquire write locks for all 726// writes. Cloud Spanner can abort the transaction for any reason. If a commit 727// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has 728// not modified any user data in Cloud Spanner. 729// 730// Unless the transaction commits, Cloud Spanner makes no guarantees about how 731// long the transaction's locks were held for. It is an error to use Cloud 732// Spanner locks for any sort of mutual exclusion other than between Cloud 733// Spanner transactions themselves. 734// 735// Aborted transactions 736// 737// Application code does not need to retry explicitly; RunInTransaction will 738// automatically retry a transaction if an attempt results in an abort. The lock 739// priority of a transaction increases after each prior aborted transaction, 740// meaning that the next attempt has a slightly better chance of success than 741// before. 742// 743// Under some circumstances (e.g., many transactions attempting to modify the 744// same row(s)), a transaction can abort many times in a short period before 745// successfully committing. Thus, it is not a good idea to cap the number of 746// retries a transaction can attempt; instead, it is better to limit the total 747// amount of wall time spent retrying. 748// 749// Idle transactions 750// 751// A transaction is considered idle if it has no outstanding reads or SQL 752// queries and has not started a read or SQL query within the last 10 753// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't 754// hold on to locks indefinitely. In that case, the commit will fail with error 755// ABORTED. 756// 757// If this behavior is undesirable, periodically executing a simple SQL query 758// in the transaction (e.g., SELECT 1) prevents the transaction from becoming 759// idle. 760type ReadWriteTransaction struct { 761 // txReadOnly contains methods for performing transactional reads. 762 txReadOnly 763 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 764 // ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin() 765 // during the initialization of ReadWriteTransaction. 766 tx transactionID 767 // mu protects concurrent access to the internal states of 768 // ReadWriteTransaction. 769 mu sync.Mutex 770 // state is the current transaction status of the read-write transaction. 771 state txState 772 // wb is the set of buffered mutations waiting to be committed. 773 wb []*Mutation 774} 775 776// BufferWrite adds a list of mutations to the set of updates that will be 777// applied when the transaction is committed. It does not actually apply the 778// write until the transaction is committed, so the operation does not block. 779// The effects of the write won't be visible to any reads (including reads done 780// in the same transaction) until the transaction commits. 781// 782// See the example for Client.ReadWriteTransaction. 783func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error { 784 t.mu.Lock() 785 defer t.mu.Unlock() 786 if t.state == txClosed { 787 return errTxClosed() 788 } 789 if t.state != txActive { 790 return errUnexpectedTxState(t.state) 791 } 792 t.wb = append(t.wb, ms...) 793 return nil 794} 795 796// Update executes a DML statement against the database. It returns the number 797// of affected rows. Update returns an error if the statement is a query. 798// However, the query is executed, and any data read will be validated upon 799// commit. 800func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) { 801 mode := sppb.ExecuteSqlRequest_NORMAL 802 return t.update(ctx, stmt, QueryOptions{ 803 Mode: &mode, 804 Options: t.qo.Options, 805 }) 806} 807 808// UpdateWithOptions executes a DML statement against the database. It returns 809// the number of affected rows. The sql query execution will be optimized 810// based on the given query options. 811func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { 812 return t.update(ctx, stmt, t.qo.merge(opts)) 813} 814 815func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { 816 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update") 817 defer func() { trace.EndSpan(ctx, err) }() 818 req, sh, err := t.prepareExecuteSQL(ctx, stmt, opts) 819 if err != nil { 820 return 0, err 821 } 822 resultSet, err := sh.getClient().ExecuteSql(ctx, req) 823 if err != nil { 824 return 0, toSpannerError(err) 825 } 826 if resultSet.Stats == nil { 827 return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL) 828 } 829 return extractRowCount(resultSet.Stats) 830} 831 832// BatchUpdate groups one or more DML statements and sends them to Spanner in a 833// single RPC. This is an efficient way to execute multiple DML statements. 834// 835// A slice of counts is returned, where each count represents the number of 836// affected rows for the given query at the same index. If an error occurs, 837// counts will be returned up to the query that encountered the error. 838func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) { 839 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate") 840 defer func() { trace.EndSpan(ctx, err) }() 841 842 sh, ts, err := t.acquire(ctx) 843 if err != nil { 844 return nil, err 845 } 846 // Cloud Spanner will return "Session not found" on bad sessions. 847 sid := sh.getID() 848 if sid == "" { 849 // Might happen if transaction is closed in the middle of a API call. 850 return nil, errSessionClosed(sh) 851 } 852 853 var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement 854 for _, st := range stmts { 855 params, paramTypes, err := st.convertParams() 856 if err != nil { 857 return nil, err 858 } 859 sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{ 860 Sql: st.SQL, 861 Params: params, 862 ParamTypes: paramTypes, 863 }) 864 } 865 866 resp, err := sh.getClient().ExecuteBatchDml(ctx, &sppb.ExecuteBatchDmlRequest{ 867 Session: sh.getID(), 868 Transaction: ts, 869 Statements: sppbStmts, 870 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 871 }) 872 if err != nil { 873 return nil, toSpannerError(err) 874 } 875 876 var counts []int64 877 for _, rs := range resp.ResultSets { 878 count, err := extractRowCount(rs.Stats) 879 if err != nil { 880 return nil, err 881 } 882 counts = append(counts, count) 883 } 884 if resp.Status.Code != 0 { 885 return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message) 886 } 887 return counts, nil 888} 889 890// acquire implements txReadEnv.acquire. 891func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 892 ts := &sppb.TransactionSelector{ 893 Selector: &sppb.TransactionSelector_Id{ 894 Id: t.tx, 895 }, 896 } 897 t.mu.Lock() 898 defer t.mu.Unlock() 899 switch t.state { 900 case txClosed: 901 return nil, nil, errTxClosed() 902 case txActive: 903 return t.sh, ts, nil 904 } 905 return nil, nil, errUnexpectedTxState(t.state) 906} 907 908// release implements txReadEnv.release. 909func (t *ReadWriteTransaction) release(err error) { 910 t.mu.Lock() 911 sh := t.sh 912 t.mu.Unlock() 913 if sh != nil && isSessionNotFoundError(err) { 914 sh.destroy() 915 } 916} 917 918func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) { 919 res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{ 920 Session: sid, 921 Options: &sppb.TransactionOptions{ 922 Mode: &sppb.TransactionOptions_ReadWrite_{ 923 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 924 }, 925 }, 926 }) 927 if err != nil { 928 return nil, err 929 } 930 if res.Id == nil { 931 return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.") 932 } 933 return res.Id, nil 934} 935 936// begin starts a read-write transacton on Cloud Spanner, it is always called 937// before any of the public APIs. 938func (t *ReadWriteTransaction) begin(ctx context.Context) error { 939 if t.tx != nil { 940 t.state = txActive 941 return nil 942 } 943 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) 944 if err == nil { 945 t.tx = tx 946 t.state = txActive 947 return nil 948 } 949 if isSessionNotFoundError(err) { 950 t.sh.destroy() 951 } 952 return err 953} 954 955// commit tries to commit a readwrite transaction to Cloud Spanner. It also 956// returns the commit timestamp for the transactions. 957func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) { 958 var ts time.Time 959 t.mu.Lock() 960 t.state = txClosed // No further operations after commit. 961 mPb, err := mutationsProto(t.wb) 962 t.mu.Unlock() 963 if err != nil { 964 return ts, err 965 } 966 // In case that sessionHandle was destroyed but transaction body fails to 967 // report it. 968 sid, client := t.sh.getID(), t.sh.getClient() 969 if sid == "" || client == nil { 970 return ts, errSessionClosed(t.sh) 971 } 972 973 res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{ 974 Session: sid, 975 Transaction: &sppb.CommitRequest_TransactionId{ 976 TransactionId: t.tx, 977 }, 978 Mutations: mPb, 979 }) 980 if e != nil { 981 return ts, toSpannerErrorWithCommitInfo(e, true) 982 } 983 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 984 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 985 } 986 if isSessionNotFoundError(err) { 987 t.sh.destroy() 988 } 989 return ts, err 990} 991 992// rollback is called when a commit is aborted or the transaction body runs 993// into error. 994func (t *ReadWriteTransaction) rollback(ctx context.Context) { 995 t.mu.Lock() 996 // Forbid further operations on rollbacked transaction. 997 t.state = txClosed 998 t.mu.Unlock() 999 // In case that sessionHandle was destroyed but transaction body fails to 1000 // report it. 1001 sid, client := t.sh.getID(), t.sh.getClient() 1002 if sid == "" || client == nil { 1003 return 1004 } 1005 err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{ 1006 Session: sid, 1007 TransactionId: t.tx, 1008 }) 1009 if isSessionNotFoundError(err) { 1010 t.sh.destroy() 1011 } 1012} 1013 1014// runInTransaction executes f under a read-write transaction context. 1015func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) { 1016 var ( 1017 ts time.Time 1018 err error 1019 errDuringCommit bool 1020 ) 1021 if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil { 1022 // Try to commit if transaction body returns no error. 1023 ts, err = t.commit(ctx) 1024 errDuringCommit = err != nil 1025 } 1026 if err != nil { 1027 if isAbortErr(err) { 1028 // Retry the transaction using the same session on ABORT error. 1029 // Cloud Spanner will create the new transaction with the previous 1030 // one's wound-wait priority. 1031 return ts, err 1032 } 1033 if isSessionNotFoundError(err) { 1034 t.sh.destroy() 1035 return ts, err 1036 } 1037 // Rollback the transaction unless the error occurred during the 1038 // commit. Executing a rollback after a commit has failed will 1039 // otherwise cause an error. Note that transient errors, such as 1040 // UNAVAILABLE, are already handled in the gRPC layer and do not show 1041 // up here. Context errors (deadline exceeded / canceled) during 1042 // commits are also not rolled back. 1043 if !errDuringCommit { 1044 t.rollback(ctx) 1045 } 1046 return ts, err 1047 } 1048 // err == nil, return commit timestamp. 1049 return ts, nil 1050} 1051 1052// writeOnlyTransaction provides the most efficient way of doing write-only 1053// transactions. It essentially does blind writes to Cloud Spanner. 1054type writeOnlyTransaction struct { 1055 // sp is the session pool which writeOnlyTransaction uses to get Cloud 1056 // Spanner sessions for blind writes. 1057 sp *sessionPool 1058} 1059 1060// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, 1061// unless one of the following happens: 1062// 1063// 1) Context times out. 1064// 2) An unretryable error (e.g. database not found) occurs. 1065// 3) There is a malformed Mutation object. 1066func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) { 1067 var ( 1068 ts time.Time 1069 sh *sessionHandle 1070 ) 1071 mPb, err := mutationsProto(ms) 1072 if err != nil { 1073 // Malformed mutation found, just return the error. 1074 return ts, err 1075 } 1076 1077 // Retry-loop for aborted transactions. 1078 // TODO: Replace with generic retryer. 1079 for { 1080 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 1081 // No usable session for doing the commit, take one from pool. 1082 sh, err = t.sp.take(ctx) 1083 if err != nil { 1084 // sessionPool.Take already retries for session 1085 // creations/retrivals. 1086 return ts, err 1087 } 1088 defer sh.recycle() 1089 } 1090 res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ 1091 Session: sh.getID(), 1092 Transaction: &sppb.CommitRequest_SingleUseTransaction{ 1093 SingleUseTransaction: &sppb.TransactionOptions{ 1094 Mode: &sppb.TransactionOptions_ReadWrite_{ 1095 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 1096 }, 1097 }, 1098 }, 1099 Mutations: mPb, 1100 }) 1101 if err != nil && !isAbortErr(err) { 1102 if isSessionNotFoundError(err) { 1103 // Discard the bad session. 1104 sh.destroy() 1105 } 1106 return ts, toSpannerErrorWithCommitInfo(err, true) 1107 } else if err == nil { 1108 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 1109 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 1110 } 1111 break 1112 } 1113 } 1114 return ts, toSpannerError(err) 1115} 1116 1117// isAbortedErr returns true if the error indicates that an gRPC call is 1118// aborted on the server side. 1119func isAbortErr(err error) bool { 1120 if err == nil { 1121 return false 1122 } 1123 if ErrCode(err) == codes.Aborted { 1124 return true 1125 } 1126 return false 1127} 1128