1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "cloud.google.com/go/internal/trace" 26 vkit "cloud.google.com/go/spanner/apiv1" 27 "github.com/golang/protobuf/proto" 28 "google.golang.org/api/iterator" 29 sppb "google.golang.org/genproto/googleapis/spanner/v1" 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/status" 32) 33 34// transactionID stores a transaction ID which uniquely identifies a transaction 35// in Cloud Spanner. 36type transactionID []byte 37 38// txReadEnv manages a read-transaction environment consisting of a session 39// handle and a transaction selector. 40type txReadEnv interface { 41 // acquire returns a read-transaction environment that can be used to 42 // perform a transactional read. 43 acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) 44 // sets the transaction's read timestamp 45 setTimestamp(time.Time) 46 // release should be called at the end of every transactional read to deal 47 // with session recycling. 48 release(error) 49} 50 51// txReadOnly contains methods for doing transactional reads. 52type txReadOnly struct { 53 // read-transaction environment for performing transactional read 54 // operations. 55 txReadEnv 56 57 // Atomic. Only needed for DML statements, but used forall. 58 sequenceNumber int64 59 60 // replaceSessionFunc is a function that can be called to replace the 61 // session that is used by the transaction. This function should only be 62 // defined for single-use transactions that can safely be retried on a 63 // different session. All other transactions will set this function to nil. 64 replaceSessionFunc func(ctx context.Context) error 65 66 // sp is the session pool for allocating a session to execute the read-only 67 // transaction. It is set only once during initialization of the 68 // txReadOnly. 69 sp *sessionPool 70 // sh is the sessionHandle allocated from sp. 71 sh *sessionHandle 72 73 // qo provides options for executing a sql query. 74 qo QueryOptions 75 76 // txOpts provides options for a transaction. 77 txOpts TransactionOptions 78} 79 80// TransactionOptions provides options for a transaction. 81type TransactionOptions struct { 82 CommitOptions CommitOptions 83} 84 85// errSessionClosed returns error for using a recycled/destroyed session 86func errSessionClosed(sh *sessionHandle) error { 87 return spannerErrorf(codes.FailedPrecondition, 88 "session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient()) 89} 90 91// Read returns a RowIterator for reading multiple rows from the database. 92func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator { 93 return t.ReadWithOptions(ctx, table, keys, columns, nil) 94} 95 96// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}. 97func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) { 98 return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index}) 99} 100 101// ReadOptions provides options for reading rows from a database. 102type ReadOptions struct { 103 // The index to use for reading. If non-empty, you can only read columns 104 // that are part of the index key, part of the primary key, or stored in the 105 // index due to a STORING clause in the index definition. 106 Index string 107 108 // The maximum number of rows to read. A limit value less than 1 means no 109 // limit. 110 Limit int 111} 112 113// ReadWithOptions returns a RowIterator for reading multiple rows from the 114// database. Pass a ReadOptions to modify the read operation. 115func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) { 116 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read") 117 defer func() { trace.EndSpan(ctx, ri.err) }() 118 var ( 119 sh *sessionHandle 120 ts *sppb.TransactionSelector 121 err error 122 ) 123 kset, err := keys.keySetProto() 124 if err != nil { 125 return &RowIterator{err: err} 126 } 127 if sh, ts, err = t.acquire(ctx); err != nil { 128 return &RowIterator{err: err} 129 } 130 // Cloud Spanner will return "Session not found" on bad sessions. 131 client := sh.getClient() 132 if client == nil { 133 // Might happen if transaction is closed in the middle of a API call. 134 return &RowIterator{err: errSessionClosed(sh)} 135 } 136 index := "" 137 limit := 0 138 if opts != nil { 139 index = opts.Index 140 if opts.Limit > 0 { 141 limit = opts.Limit 142 } 143 } 144 return streamWithReplaceSessionFunc( 145 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 146 sh.session.logger, 147 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 148 return client.StreamingRead(ctx, 149 &sppb.ReadRequest{ 150 Session: t.sh.getID(), 151 Transaction: ts, 152 Table: table, 153 Index: index, 154 Columns: columns, 155 KeySet: kset, 156 ResumeToken: resumeToken, 157 Limit: int64(limit), 158 }) 159 }, 160 t.replaceSessionFunc, 161 t.setTimestamp, 162 t.release, 163 ) 164} 165 166// errRowNotFound returns error for not being able to read the row identified by 167// key. 168func errRowNotFound(table string, key Key) error { 169 return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key) 170} 171 172// errRowNotFoundByIndex returns error for not being able to read the row by index. 173func errRowNotFoundByIndex(table string, key Key, index string) error { 174 return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 175} 176 177// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index. 178func errMultipleRowsFound(table string, key Key, index string) error { 179 return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 180} 181 182// ReadRow reads a single row from the database. 183// 184// If no row is present with the given key, then ReadRow returns an error where 185// spanner.ErrCode(err) is codes.NotFound. 186func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) { 187 iter := t.Read(ctx, table, key, columns) 188 defer iter.Stop() 189 row, err := iter.Next() 190 switch err { 191 case iterator.Done: 192 return nil, errRowNotFound(table, key) 193 case nil: 194 return row, nil 195 default: 196 return nil, err 197 } 198} 199 200// ReadRowUsingIndex reads a single row from the database using an index. 201// 202// If no row is present with the given index, then ReadRowUsingIndex returns an 203// error where spanner.ErrCode(err) is codes.NotFound. 204// 205// If more than one row received with the given index, then ReadRowUsingIndex 206// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition. 207func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) { 208 iter := t.ReadUsingIndex(ctx, table, index, key, columns) 209 defer iter.Stop() 210 row, err := iter.Next() 211 switch err { 212 case iterator.Done: 213 return nil, errRowNotFoundByIndex(table, key, index) 214 case nil: 215 // If more than one row found, return an error. 216 _, err := iter.Next() 217 switch err { 218 case iterator.Done: 219 return row, nil 220 case nil: 221 return nil, errMultipleRowsFound(table, key, index) 222 default: 223 return nil, err 224 } 225 default: 226 return nil, err 227 } 228} 229 230// QueryOptions provides options for executing a sql query from a database. 231type QueryOptions struct { 232 Mode *sppb.ExecuteSqlRequest_QueryMode 233 Options *sppb.ExecuteSqlRequest_QueryOptions 234} 235 236// merge combines two QueryOptions that the input parameter will have higher 237// order of precedence. 238func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { 239 merged := QueryOptions{ 240 Mode: qo.Mode, 241 Options: &sppb.ExecuteSqlRequest_QueryOptions{}, 242 } 243 if opts.Mode != nil { 244 merged.Mode = opts.Mode 245 } 246 proto.Merge(merged.Options, qo.Options) 247 proto.Merge(merged.Options, opts.Options) 248 return merged 249} 250 251// Query executes a query against the database. It returns a RowIterator for 252// retrieving the resulting rows. 253// 254// Query returns only row data, without a query plan or execution statistics. 255// Use QueryWithStats to get rows along with the plan and statistics. Use 256// AnalyzeQuery to get just the plan. 257func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator { 258 mode := sppb.ExecuteSqlRequest_NORMAL 259 return t.query(ctx, statement, QueryOptions{ 260 Mode: &mode, 261 Options: t.qo.Options, 262 }) 263} 264 265// QueryWithOptions executes a SQL statment against the database. It returns 266// a RowIterator for retrieving the resulting rows. The sql query execution 267// will be optimized based on the given query options. 268func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, opts QueryOptions) *RowIterator { 269 return t.query(ctx, statement, t.qo.merge(opts)) 270} 271 272// QueryWithStats executes a SQL statement against the database. It returns 273// a RowIterator for retrieving the resulting rows. The RowIterator will also 274// be populated with a query plan and execution statistics. 275func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator { 276 mode := sppb.ExecuteSqlRequest_PROFILE 277 return t.query(ctx, statement, QueryOptions{ 278 Mode: &mode, 279 Options: t.qo.Options, 280 }) 281} 282 283// AnalyzeQuery returns the query plan for statement. 284func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) { 285 mode := sppb.ExecuteSqlRequest_PLAN 286 iter := t.query(ctx, statement, QueryOptions{ 287 Mode: &mode, 288 Options: t.qo.Options, 289 }) 290 defer iter.Stop() 291 for { 292 _, err := iter.Next() 293 if err == iterator.Done { 294 break 295 } 296 if err != nil { 297 return nil, err 298 } 299 } 300 if iter.QueryPlan == nil { 301 return nil, spannerErrorf(codes.Internal, "query plan unavailable") 302 } 303 return iter.QueryPlan, nil 304} 305 306func (t *txReadOnly) query(ctx context.Context, statement Statement, options QueryOptions) (ri *RowIterator) { 307 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query") 308 defer func() { trace.EndSpan(ctx, ri.err) }() 309 req, sh, err := t.prepareExecuteSQL(ctx, statement, options) 310 if err != nil { 311 return &RowIterator{err: err} 312 } 313 client := sh.getClient() 314 return streamWithReplaceSessionFunc( 315 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 316 sh.session.logger, 317 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 318 req.ResumeToken = resumeToken 319 req.Session = t.sh.getID() 320 return client.ExecuteStreamingSql(ctx, req) 321 }, 322 t.replaceSessionFunc, 323 t.setTimestamp, 324 t.release) 325} 326 327func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, options QueryOptions) (*sppb.ExecuteSqlRequest, *sessionHandle, error) { 328 sh, ts, err := t.acquire(ctx) 329 if err != nil { 330 return nil, nil, err 331 } 332 // Cloud Spanner will return "Session not found" on bad sessions. 333 sid := sh.getID() 334 if sid == "" { 335 // Might happen if transaction is closed in the middle of a API call. 336 return nil, nil, errSessionClosed(sh) 337 } 338 params, paramTypes, err := stmt.convertParams() 339 if err != nil { 340 return nil, nil, err 341 } 342 mode := sppb.ExecuteSqlRequest_NORMAL 343 if options.Mode != nil { 344 mode = *options.Mode 345 } 346 req := &sppb.ExecuteSqlRequest{ 347 Session: sid, 348 Transaction: ts, 349 Sql: stmt.SQL, 350 QueryMode: mode, 351 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 352 Params: params, 353 ParamTypes: paramTypes, 354 QueryOptions: options.Options, 355 } 356 return req, sh, nil 357} 358 359// txState is the status of a transaction. 360type txState int 361 362const ( 363 // transaction is new, waiting to be initialized.. 364 txNew txState = iota 365 // transaction is being initialized. 366 txInit 367 // transaction is active and can perform read/write. 368 txActive 369 // transaction is closed, cannot be used anymore. 370 txClosed 371) 372 373// errRtsUnavailable returns error for read transaction's read timestamp being 374// unavailable. 375func errRtsUnavailable() error { 376 return spannerErrorf(codes.Internal, "read timestamp is unavailable") 377} 378 379// errTxClosed returns error for using a closed transaction. 380func errTxClosed() error { 381 return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction") 382} 383 384// errUnexpectedTxState returns error for transaction enters an unexpected state. 385func errUnexpectedTxState(ts txState) error { 386 return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts) 387} 388 389// ReadOnlyTransaction provides a snapshot transaction with guaranteed 390// consistency across reads, but does not allow writes. Read-only transactions 391// can be configured to read at timestamps in the past. 392// 393// Read-only transactions do not take locks. Instead, they work by choosing a 394// Cloud Spanner timestamp, then executing all reads at that timestamp. Since 395// they do not acquire locks, they do not block concurrent read-write 396// transactions. 397// 398// Unlike locking read-write transactions, read-only transactions never abort. 399// They can fail if the chosen read timestamp is garbage collected; however, the 400// default garbage collection policy is generous enough that most applications 401// do not need to worry about this in practice. See the documentation of 402// TimestampBound for more details. 403// 404// A ReadOnlyTransaction consumes resources on the server until Close is called. 405type ReadOnlyTransaction struct { 406 // mu protects concurrent access to the internal states of ReadOnlyTransaction. 407 mu sync.Mutex 408 // txReadOnly contains methods for performing transactional reads. 409 txReadOnly 410 // singleUse indicates that the transaction can be used for only one read. 411 singleUse bool 412 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 413 // ReadOnlyTransaction. 414 tx transactionID 415 // txReadyOrClosed is for broadcasting that transaction ID has been returned 416 // by Cloud Spanner or that transaction is closed. 417 txReadyOrClosed chan struct{} 418 // state is the current transaction status of the ReadOnly transaction. 419 state txState 420 // rts is the read timestamp returned by transactional reads. 421 rts time.Time 422 // tb is the read staleness bound specification for transactional reads. 423 tb TimestampBound 424} 425 426// errTxInitTimeout returns error for timeout in waiting for initialization of 427// the transaction. 428func errTxInitTimeout() error { 429 return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization") 430} 431 432// getTimestampBound returns the read staleness bound specified for the 433// ReadOnlyTransaction. 434func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound { 435 t.mu.Lock() 436 defer t.mu.Unlock() 437 return t.tb 438} 439 440// begin starts a snapshot read-only Transaction on Cloud Spanner. 441func (t *ReadOnlyTransaction) begin(ctx context.Context) error { 442 var ( 443 locked bool 444 tx transactionID 445 rts time.Time 446 sh *sessionHandle 447 err error 448 res *sppb.Transaction 449 ) 450 defer func() { 451 if !locked { 452 t.mu.Lock() 453 // Not necessary, just to make it clear that t.mu is being held when 454 // locked == true. 455 locked = true 456 } 457 if t.state != txClosed { 458 // Signal other initialization routines. 459 close(t.txReadyOrClosed) 460 t.txReadyOrClosed = make(chan struct{}) 461 } 462 t.mu.Unlock() 463 if err != nil && sh != nil { 464 // Got a valid session handle, but failed to initialize transaction= 465 // on Cloud Spanner. 466 if isSessionNotFoundError(err) { 467 sh.destroy() 468 } 469 // If sh.destroy was already executed, this becomes a noop. 470 sh.recycle() 471 } 472 }() 473 // Retry the BeginTransaction call if a 'Session not found' is returned. 474 for { 475 sh, err = t.sp.take(ctx) 476 if err != nil { 477 return err 478 } 479 res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 480 Session: sh.getID(), 481 Options: &sppb.TransactionOptions{ 482 Mode: &sppb.TransactionOptions_ReadOnly_{ 483 ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true), 484 }, 485 }, 486 }) 487 if isSessionNotFoundError(err) { 488 sh.destroy() 489 continue 490 } else if err == nil { 491 tx = res.Id 492 if res.ReadTimestamp != nil { 493 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 494 } 495 } else { 496 err = ToSpannerError(err) 497 } 498 break 499 } 500 t.mu.Lock() 501 502 // defer function will be executed with t.mu being held. 503 locked = true 504 505 // During the execution of t.begin(), t.Close() was invoked. 506 if t.state == txClosed { 507 return errSessionClosed(sh) 508 } 509 510 // If begin() fails, this allows other queries to take over the 511 // initialization. 512 t.tx = nil 513 if err == nil { 514 t.tx = tx 515 t.rts = rts 516 t.sh = sh 517 // State transite to txActive. 518 t.state = txActive 519 } 520 return err 521} 522 523// acquire implements txReadEnv.acquire. 524func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 525 if err := checkNestedTxn(ctx); err != nil { 526 return nil, nil, err 527 } 528 if t.singleUse { 529 return t.acquireSingleUse(ctx) 530 } 531 return t.acquireMultiUse(ctx) 532} 533 534func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 535 t.mu.Lock() 536 defer t.mu.Unlock() 537 switch t.state { 538 case txClosed: 539 // A closed single-use transaction can never be reused. 540 return nil, nil, errTxClosed() 541 case txNew: 542 t.state = txClosed 543 ts := &sppb.TransactionSelector{ 544 Selector: &sppb.TransactionSelector_SingleUse{ 545 SingleUse: &sppb.TransactionOptions{ 546 Mode: &sppb.TransactionOptions_ReadOnly_{ 547 ReadOnly: buildTransactionOptionsReadOnly(t.tb, true), 548 }, 549 }, 550 }, 551 } 552 sh, err := t.sp.take(ctx) 553 if err != nil { 554 return nil, nil, err 555 } 556 557 // Install session handle into t, which can be used for readonly 558 // operations later. 559 t.sh = sh 560 return sh, ts, nil 561 } 562 us := t.state 563 564 // SingleUse transaction should only be in either txNew state or txClosed 565 // state. 566 return nil, nil, errUnexpectedTxState(us) 567} 568 569func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 570 for { 571 t.mu.Lock() 572 switch t.state { 573 case txClosed: 574 t.mu.Unlock() 575 return nil, nil, errTxClosed() 576 case txNew: 577 // State transit to txInit so that no further TimestampBound change 578 // is accepted. 579 t.state = txInit 580 t.mu.Unlock() 581 continue 582 case txInit: 583 if t.tx != nil { 584 // Wait for a transaction ID to become ready. 585 txReadyOrClosed := t.txReadyOrClosed 586 t.mu.Unlock() 587 select { 588 case <-txReadyOrClosed: 589 // Need to check transaction state again. 590 continue 591 case <-ctx.Done(): 592 // The waiting for initialization is timeout, return error 593 // directly. 594 return nil, nil, errTxInitTimeout() 595 } 596 } 597 // Take the ownership of initializing the transaction. 598 t.tx = transactionID{} 599 t.mu.Unlock() 600 // Begin a read-only transaction. 601 // 602 // TODO: consider adding a transaction option which allow queries to 603 // initiate transactions by themselves. Note that this option might 604 // not be always good because the ID of the new transaction won't 605 // be ready till the query returns some data or completes. 606 if err := t.begin(ctx); err != nil { 607 return nil, nil, err 608 } 609 610 // If t.begin() succeeded, t.state should have been changed to 611 // txActive, so we can just continue here. 612 continue 613 case txActive: 614 sh := t.sh 615 ts := &sppb.TransactionSelector{ 616 Selector: &sppb.TransactionSelector_Id{ 617 Id: t.tx, 618 }, 619 } 620 t.mu.Unlock() 621 return sh, ts, nil 622 } 623 state := t.state 624 t.mu.Unlock() 625 return nil, nil, errUnexpectedTxState(state) 626 } 627} 628 629func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) { 630 t.mu.Lock() 631 defer t.mu.Unlock() 632 if t.rts.IsZero() { 633 t.rts = ts 634 } 635} 636 637// release implements txReadEnv.release. 638func (t *ReadOnlyTransaction) release(err error) { 639 t.mu.Lock() 640 sh := t.sh 641 t.mu.Unlock() 642 if sh != nil { // sh could be nil if t.acquire() fails. 643 if isSessionNotFoundError(err) { 644 sh.destroy() 645 } 646 if t.singleUse { 647 // If session handle is already destroyed, this becomes a noop. 648 sh.recycle() 649 } 650 } 651} 652 653// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads 654// after being closed. 655func (t *ReadOnlyTransaction) Close() { 656 if t.singleUse { 657 return 658 } 659 t.mu.Lock() 660 if t.state != txClosed { 661 t.state = txClosed 662 close(t.txReadyOrClosed) 663 } 664 sh := t.sh 665 t.mu.Unlock() 666 if sh == nil { 667 return 668 } 669 // If session handle is already destroyed, this becomes a noop. If there are 670 // still active queries and if the recycled session is reused before they 671 // complete, Cloud Spanner will cancel them on behalf of the new transaction 672 // on the session. 673 if sh != nil { 674 sh.recycle() 675 } 676} 677 678// Timestamp returns the timestamp chosen to perform reads and queries in this 679// transaction. The value can only be read after some read or query has either 680// returned some data or completed without returning any data. 681func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) { 682 t.mu.Lock() 683 defer t.mu.Unlock() 684 if t.rts.IsZero() { 685 return t.rts, errRtsUnavailable() 686 } 687 return t.rts, nil 688} 689 690// WithTimestampBound specifies the TimestampBound to use for read or query. 691// This can only be used before the first read or query is invoked. Note: 692// bounded staleness is not available with general ReadOnlyTransactions; use a 693// single-use ReadOnlyTransaction instead. 694// 695// The returned value is the ReadOnlyTransaction so calls can be chained. 696func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction { 697 t.mu.Lock() 698 defer t.mu.Unlock() 699 if t.state == txNew { 700 // Only allow to set TimestampBound before the first query. 701 t.tb = tb 702 } 703 return t 704} 705 706// ReadWriteTransaction provides a locking read-write transaction. 707// 708// This type of transaction is the only way to write data into Cloud Spanner; 709// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use 710// transactions internally. These transactions rely on pessimistic locking and, 711// if necessary, two-phase commit. Locking read-write transactions may abort, 712// requiring the application to retry. However, the interface exposed by 713// (*Client).ReadWriteTransaction eliminates the need for applications to write 714// retry loops explicitly. 715// 716// Locking transactions may be used to atomically read-modify-write data 717// anywhere in a database. This type of transaction is externally consistent. 718// 719// Clients should attempt to minimize the amount of time a transaction is 720// active. Faster transactions commit with higher probability and cause less 721// contention. Cloud Spanner attempts to keep read locks active as long as the 722// transaction continues to do reads. Long periods of inactivity at the client 723// may cause Cloud Spanner to release a transaction's locks and abort it. 724// 725// Reads performed within a transaction acquire locks on the data being 726// read. Writes can only be done at commit time, after all reads have been 727// completed. Conceptually, a read-write transaction consists of zero or more 728// reads or SQL queries followed by a commit. 729// 730// See (*Client).ReadWriteTransaction for an example. 731// 732// Semantics 733// 734// Cloud Spanner can commit the transaction if all read locks it acquired are 735// still valid at commit time, and it is able to acquire write locks for all 736// writes. Cloud Spanner can abort the transaction for any reason. If a commit 737// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has 738// not modified any user data in Cloud Spanner. 739// 740// Unless the transaction commits, Cloud Spanner makes no guarantees about how 741// long the transaction's locks were held for. It is an error to use Cloud 742// Spanner locks for any sort of mutual exclusion other than between Cloud 743// Spanner transactions themselves. 744// 745// Aborted transactions 746// 747// Application code does not need to retry explicitly; RunInTransaction will 748// automatically retry a transaction if an attempt results in an abort. The lock 749// priority of a transaction increases after each prior aborted transaction, 750// meaning that the next attempt has a slightly better chance of success than 751// before. 752// 753// Under some circumstances (e.g., many transactions attempting to modify the 754// same row(s)), a transaction can abort many times in a short period before 755// successfully committing. Thus, it is not a good idea to cap the number of 756// retries a transaction can attempt; instead, it is better to limit the total 757// amount of wall time spent retrying. 758// 759// Idle transactions 760// 761// A transaction is considered idle if it has no outstanding reads or SQL 762// queries and has not started a read or SQL query within the last 10 763// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't 764// hold on to locks indefinitely. In that case, the commit will fail with error 765// ABORTED. 766// 767// If this behavior is undesirable, periodically executing a simple SQL query 768// in the transaction (e.g., SELECT 1) prevents the transaction from becoming 769// idle. 770type ReadWriteTransaction struct { 771 // txReadOnly contains methods for performing transactional reads. 772 txReadOnly 773 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 774 // ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin() 775 // during the initialization of ReadWriteTransaction. 776 tx transactionID 777 // mu protects concurrent access to the internal states of 778 // ReadWriteTransaction. 779 mu sync.Mutex 780 // state is the current transaction status of the read-write transaction. 781 state txState 782 // wb is the set of buffered mutations waiting to be committed. 783 wb []*Mutation 784} 785 786// BufferWrite adds a list of mutations to the set of updates that will be 787// applied when the transaction is committed. It does not actually apply the 788// write until the transaction is committed, so the operation does not block. 789// The effects of the write won't be visible to any reads (including reads done 790// in the same transaction) until the transaction commits. 791// 792// See the example for Client.ReadWriteTransaction. 793func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error { 794 t.mu.Lock() 795 defer t.mu.Unlock() 796 if t.state == txClosed { 797 return errTxClosed() 798 } 799 if t.state != txActive { 800 return errUnexpectedTxState(t.state) 801 } 802 t.wb = append(t.wb, ms...) 803 return nil 804} 805 806// Update executes a DML statement against the database. It returns the number 807// of affected rows. Update returns an error if the statement is a query. 808// However, the query is executed, and any data read will be validated upon 809// commit. 810func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) { 811 mode := sppb.ExecuteSqlRequest_NORMAL 812 return t.update(ctx, stmt, QueryOptions{ 813 Mode: &mode, 814 Options: t.qo.Options, 815 }) 816} 817 818// UpdateWithOptions executes a DML statement against the database. It returns 819// the number of affected rows. The sql query execution will be optimized 820// based on the given query options. 821func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { 822 return t.update(ctx, stmt, t.qo.merge(opts)) 823} 824 825func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { 826 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update") 827 defer func() { trace.EndSpan(ctx, err) }() 828 req, sh, err := t.prepareExecuteSQL(ctx, stmt, opts) 829 if err != nil { 830 return 0, err 831 } 832 resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req) 833 if err != nil { 834 return 0, ToSpannerError(err) 835 } 836 if resultSet.Stats == nil { 837 return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL) 838 } 839 return extractRowCount(resultSet.Stats) 840} 841 842// BatchUpdate groups one or more DML statements and sends them to Spanner in a 843// single RPC. This is an efficient way to execute multiple DML statements. 844// 845// A slice of counts is returned, where each count represents the number of 846// affected rows for the given query at the same index. If an error occurs, 847// counts will be returned up to the query that encountered the error. 848func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) { 849 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate") 850 defer func() { trace.EndSpan(ctx, err) }() 851 852 sh, ts, err := t.acquire(ctx) 853 if err != nil { 854 return nil, err 855 } 856 // Cloud Spanner will return "Session not found" on bad sessions. 857 sid := sh.getID() 858 if sid == "" { 859 // Might happen if transaction is closed in the middle of a API call. 860 return nil, errSessionClosed(sh) 861 } 862 863 var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement 864 for _, st := range stmts { 865 params, paramTypes, err := st.convertParams() 866 if err != nil { 867 return nil, err 868 } 869 sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{ 870 Sql: st.SQL, 871 Params: params, 872 ParamTypes: paramTypes, 873 }) 874 } 875 876 resp, err := sh.getClient().ExecuteBatchDml(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.ExecuteBatchDmlRequest{ 877 Session: sh.getID(), 878 Transaction: ts, 879 Statements: sppbStmts, 880 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 881 }) 882 if err != nil { 883 return nil, ToSpannerError(err) 884 } 885 886 var counts []int64 887 for _, rs := range resp.ResultSets { 888 count, err := extractRowCount(rs.Stats) 889 if err != nil { 890 return nil, err 891 } 892 counts = append(counts, count) 893 } 894 if resp.Status != nil && resp.Status.Code != 0 { 895 return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message) 896 } 897 return counts, nil 898} 899 900// acquire implements txReadEnv.acquire. 901func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 902 ts := &sppb.TransactionSelector{ 903 Selector: &sppb.TransactionSelector_Id{ 904 Id: t.tx, 905 }, 906 } 907 t.mu.Lock() 908 defer t.mu.Unlock() 909 switch t.state { 910 case txClosed: 911 return nil, nil, errTxClosed() 912 case txActive: 913 return t.sh, ts, nil 914 } 915 return nil, nil, errUnexpectedTxState(t.state) 916} 917 918// release implements txReadEnv.release. 919func (t *ReadWriteTransaction) release(err error) { 920 t.mu.Lock() 921 sh := t.sh 922 t.mu.Unlock() 923 if sh != nil && isSessionNotFoundError(err) { 924 sh.destroy() 925 } 926} 927 928func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) { 929 res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{ 930 Session: sid, 931 Options: &sppb.TransactionOptions{ 932 Mode: &sppb.TransactionOptions_ReadWrite_{ 933 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 934 }, 935 }, 936 }) 937 if err != nil { 938 return nil, err 939 } 940 if res.Id == nil { 941 return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.") 942 } 943 return res.Id, nil 944} 945 946// begin starts a read-write transacton on Cloud Spanner, it is always called 947// before any of the public APIs. 948func (t *ReadWriteTransaction) begin(ctx context.Context) error { 949 if t.tx != nil { 950 t.state = txActive 951 return nil 952 } 953 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) 954 if err == nil { 955 t.tx = tx 956 t.state = txActive 957 return nil 958 } 959 if isSessionNotFoundError(err) { 960 t.sh.destroy() 961 } 962 return err 963} 964 965// CommitResponse provides a response of a transaction commit in a database. 966type CommitResponse struct { 967 // CommitTs is the commit time for a transaction. 968 CommitTs time.Time 969 // CommitStats is the commit statistics for a transaction. 970 CommitStats *sppb.CommitResponse_CommitStats 971} 972 973// CommitOptions provides options for commiting a transaction in a database. 974type CommitOptions struct { 975 ReturnCommitStats bool 976} 977 978// commit tries to commit a readwrite transaction to Cloud Spanner. It also 979// returns the commit response for the transactions. 980func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions) (CommitResponse, error) { 981 resp := CommitResponse{} 982 t.mu.Lock() 983 t.state = txClosed // No further operations after commit. 984 mPb, err := mutationsProto(t.wb) 985 t.mu.Unlock() 986 if err != nil { 987 return resp, err 988 } 989 // In case that sessionHandle was destroyed but transaction body fails to 990 // report it. 991 sid, client := t.sh.getID(), t.sh.getClient() 992 if sid == "" || client == nil { 993 return resp, errSessionClosed(t.sh) 994 } 995 996 res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{ 997 Session: sid, 998 Transaction: &sppb.CommitRequest_TransactionId{ 999 TransactionId: t.tx, 1000 }, 1001 Mutations: mPb, 1002 ReturnCommitStats: options.ReturnCommitStats, 1003 }) 1004 if e != nil { 1005 return resp, toSpannerErrorWithCommitInfo(e, true) 1006 } 1007 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 1008 resp.CommitTs = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 1009 } 1010 if options.ReturnCommitStats { 1011 resp.CommitStats = res.CommitStats 1012 } 1013 if isSessionNotFoundError(err) { 1014 t.sh.destroy() 1015 } 1016 return resp, err 1017} 1018 1019// rollback is called when a commit is aborted or the transaction body runs 1020// into error. 1021func (t *ReadWriteTransaction) rollback(ctx context.Context) { 1022 t.mu.Lock() 1023 // Forbid further operations on rollbacked transaction. 1024 t.state = txClosed 1025 t.mu.Unlock() 1026 // In case that sessionHandle was destroyed but transaction body fails to 1027 // report it. 1028 sid, client := t.sh.getID(), t.sh.getClient() 1029 if sid == "" || client == nil { 1030 return 1031 } 1032 err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{ 1033 Session: sid, 1034 TransactionId: t.tx, 1035 }) 1036 if isSessionNotFoundError(err) { 1037 t.sh.destroy() 1038 } 1039} 1040 1041// runInTransaction executes f under a read-write transaction context. 1042func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (CommitResponse, error) { 1043 var ( 1044 resp CommitResponse 1045 err error 1046 errDuringCommit bool 1047 ) 1048 if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil { 1049 // Try to commit if transaction body returns no error. 1050 resp, err = t.commit(ctx, t.txOpts.CommitOptions) 1051 errDuringCommit = err != nil 1052 } 1053 if err != nil { 1054 if isAbortedErr(err) { 1055 // Retry the transaction using the same session on ABORT error. 1056 // Cloud Spanner will create the new transaction with the previous 1057 // one's wound-wait priority. 1058 return resp, err 1059 } 1060 if isSessionNotFoundError(err) { 1061 t.sh.destroy() 1062 return resp, err 1063 } 1064 // Rollback the transaction unless the error occurred during the 1065 // commit. Executing a rollback after a commit has failed will 1066 // otherwise cause an error. Note that transient errors, such as 1067 // UNAVAILABLE, are already handled in the gRPC layer and do not show 1068 // up here. Context errors (deadline exceeded / canceled) during 1069 // commits are also not rolled back. 1070 if !errDuringCommit { 1071 t.rollback(ctx) 1072 } 1073 return resp, err 1074 } 1075 // err == nil, return commit response. 1076 return resp, nil 1077} 1078 1079// ReadWriteStmtBasedTransaction provides a wrapper of ReadWriteTransaction in 1080// order to run a read-write transaction in a statement-based way. 1081// 1082// This struct is returned by NewReadWriteStmtBasedTransaction and contains 1083// Commit() and Rollback() methods to end a transaction. 1084type ReadWriteStmtBasedTransaction struct { 1085 // ReadWriteTransaction contains methods for performing transactional reads. 1086 ReadWriteTransaction 1087 1088 options TransactionOptions 1089} 1090 1091// NewReadWriteStmtBasedTransaction starts a read-write transaction. Commit() or 1092// Rollback() must be called to end a transaction. If Commit() or Rollback() is 1093// not called, the session that is used by the transaction will not be returned 1094// to the pool and cause a session leak. 1095// 1096// This method should only be used when manual error handling and retry 1097// management is needed. Cloud Spanner may abort a read/write transaction at any 1098// moment, and each statement that is executed on the transaction should be 1099// checked for an Aborted error, including queries and read operations. 1100// 1101// For most use cases, client.ReadWriteTransaction should be used, as it will 1102// handle all Aborted and 'Session not found' errors automatically. 1103func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWriteStmtBasedTransaction, error) { 1104 return NewReadWriteStmtBasedTransactionWithOptions(ctx, c, TransactionOptions{}) 1105} 1106 1107// NewReadWriteStmtBasedTransactionWithOptions starts a read-write transaction 1108// with configurable options. Commit() or Rollback() must be called to end a 1109// transaction. If Commit() or Rollback() is not called, the session that is 1110// used by the transaction will not be returned to the pool and cause a session 1111// leak. 1112// 1113// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of 1114// NewReadWriteStmtBasedTransaction. 1115func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) { 1116 var ( 1117 sh *sessionHandle 1118 err error 1119 t *ReadWriteStmtBasedTransaction 1120 ) 1121 sh, err = c.idleSessions.takeWriteSession(ctx) 1122 if err != nil { 1123 // If session retrieval fails, just fail the transaction. 1124 return nil, err 1125 } 1126 t = &ReadWriteStmtBasedTransaction{ 1127 ReadWriteTransaction: ReadWriteTransaction{ 1128 tx: sh.getTransactionID(), 1129 }, 1130 } 1131 t.txReadOnly.sh = sh 1132 t.txReadOnly.txReadEnv = t 1133 t.txReadOnly.qo = c.qo 1134 t.txOpts = options 1135 1136 if err = t.begin(ctx); err != nil { 1137 if sh != nil { 1138 sh.recycle() 1139 } 1140 return nil, err 1141 } 1142 return t, err 1143} 1144 1145// Commit tries to commit a readwrite transaction to Cloud Spanner. It also 1146// returns the commit timestamp for the transactions. 1147func (t *ReadWriteStmtBasedTransaction) Commit(ctx context.Context) (time.Time, error) { 1148 resp, err := t.CommitWithReturnResp(ctx) 1149 return resp.CommitTs, err 1150} 1151 1152// CommitWithReturnResp tries to commit a readwrite transaction. It also returns 1153// the commit timestamp and stats for the transactions. 1154func (t *ReadWriteStmtBasedTransaction) CommitWithReturnResp(ctx context.Context) (CommitResponse, error) { 1155 resp, err := t.commit(ctx, t.txOpts.CommitOptions) 1156 // Rolling back an aborted transaction is not necessary. 1157 if err != nil && status.Code(err) != codes.Aborted { 1158 t.rollback(ctx) 1159 } 1160 if t.sh != nil { 1161 t.sh.recycle() 1162 } 1163 return resp, err 1164} 1165 1166// Rollback is called to cancel the ongoing transaction that has not been 1167// committed yet. 1168func (t *ReadWriteStmtBasedTransaction) Rollback(ctx context.Context) { 1169 t.rollback(ctx) 1170 if t.sh != nil { 1171 t.sh.recycle() 1172 } 1173} 1174 1175// writeOnlyTransaction provides the most efficient way of doing write-only 1176// transactions. It essentially does blind writes to Cloud Spanner. 1177type writeOnlyTransaction struct { 1178 // sp is the session pool which writeOnlyTransaction uses to get Cloud 1179 // Spanner sessions for blind writes. 1180 sp *sessionPool 1181} 1182 1183// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, 1184// unless one of the following happens: 1185// 1186// 1) Context times out. 1187// 2) An unretryable error (e.g. database not found) occurs. 1188// 3) There is a malformed Mutation object. 1189func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) { 1190 var ( 1191 ts time.Time 1192 sh *sessionHandle 1193 ) 1194 mPb, err := mutationsProto(ms) 1195 if err != nil { 1196 // Malformed mutation found, just return the error. 1197 return ts, err 1198 } 1199 1200 // Retry-loop for aborted transactions. 1201 // TODO: Replace with generic retryer. 1202 for { 1203 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 1204 // No usable session for doing the commit, take one from pool. 1205 sh, err = t.sp.take(ctx) 1206 if err != nil { 1207 // sessionPool.Take already retries for session 1208 // creations/retrivals. 1209 return ts, err 1210 } 1211 defer sh.recycle() 1212 } 1213 res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ 1214 Session: sh.getID(), 1215 Transaction: &sppb.CommitRequest_SingleUseTransaction{ 1216 SingleUseTransaction: &sppb.TransactionOptions{ 1217 Mode: &sppb.TransactionOptions_ReadWrite_{ 1218 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 1219 }, 1220 }, 1221 }, 1222 Mutations: mPb, 1223 }) 1224 if err != nil && !isAbortedErr(err) { 1225 if isSessionNotFoundError(err) { 1226 // Discard the bad session. 1227 sh.destroy() 1228 } 1229 return ts, toSpannerErrorWithCommitInfo(err, true) 1230 } else if err == nil { 1231 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 1232 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 1233 } 1234 break 1235 } 1236 } 1237 return ts, ToSpannerError(err) 1238} 1239 1240// isAbortedErr returns true if the error indicates that an gRPC call is 1241// aborted on the server side. 1242func isAbortedErr(err error) bool { 1243 if err == nil { 1244 return false 1245 } 1246 if ErrCode(err) == codes.Aborted { 1247 return true 1248 } 1249 return false 1250} 1251