1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "cloud.google.com/go/internal/trace" 26 vkit "cloud.google.com/go/spanner/apiv1" 27 "github.com/golang/protobuf/proto" 28 "google.golang.org/api/iterator" 29 sppb "google.golang.org/genproto/googleapis/spanner/v1" 30 "google.golang.org/grpc/codes" 31 "google.golang.org/grpc/status" 32) 33 34// transactionID stores a transaction ID which uniquely identifies a transaction 35// in Cloud Spanner. 36type transactionID []byte 37 38// txReadEnv manages a read-transaction environment consisting of a session 39// handle and a transaction selector. 40type txReadEnv interface { 41 // acquire returns a read-transaction environment that can be used to 42 // perform a transactional read. 43 acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) 44 // sets the transaction's read timestamp 45 setTimestamp(time.Time) 46 // release should be called at the end of every transactional read to deal 47 // with session recycling. 48 release(error) 49} 50 51// txReadOnly contains methods for doing transactional reads. 52type txReadOnly struct { 53 // read-transaction environment for performing transactional read 54 // operations. 55 txReadEnv 56 57 // Atomic. Only needed for DML statements, but used forall. 58 sequenceNumber int64 59 60 // replaceSessionFunc is a function that can be called to replace the 61 // session that is used by the transaction. This function should only be 62 // defined for single-use transactions that can safely be retried on a 63 // different session. All other transactions will set this function to nil. 64 replaceSessionFunc func(ctx context.Context) error 65 66 // sp is the session pool for allocating a session to execute the read-only 67 // transaction. It is set only once during initialization of the 68 // txReadOnly. 69 sp *sessionPool 70 // sh is the sessionHandle allocated from sp. 71 sh *sessionHandle 72 73 // qo provides options for executing a sql query. 74 qo QueryOptions 75 76 // txOpts provides options for a transaction. 77 txOpts TransactionOptions 78} 79 80// Internal interface for types that can configure the priority of an RPC. 81type requestPrioritizer interface { 82 requestPriority() sppb.RequestOptions_Priority 83} 84 85// TransactionOptions provides options for a transaction. 86type TransactionOptions struct { 87 CommitOptions CommitOptions 88 89 // CommitPriority is the priority to use for the Commit RPC for the 90 // transaction. 91 CommitPriority sppb.RequestOptions_Priority 92} 93 94func (to *TransactionOptions) requestPriority() sppb.RequestOptions_Priority { 95 return to.CommitPriority 96} 97 98// errSessionClosed returns error for using a recycled/destroyed session 99func errSessionClosed(sh *sessionHandle) error { 100 return spannerErrorf(codes.FailedPrecondition, 101 "session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient()) 102} 103 104// Read returns a RowIterator for reading multiple rows from the database. 105func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator { 106 return t.ReadWithOptions(ctx, table, keys, columns, nil) 107} 108 109// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}. 110func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) { 111 return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index}) 112} 113 114// ReadOptions provides options for reading rows from a database. 115type ReadOptions struct { 116 // The index to use for reading. If non-empty, you can only read columns 117 // that are part of the index key, part of the primary key, or stored in the 118 // index due to a STORING clause in the index definition. 119 Index string 120 121 // The maximum number of rows to read. A limit value less than 1 means no 122 // limit. 123 Limit int 124 125 // Priority is the RPC priority to use for the read operation. 126 Priority sppb.RequestOptions_Priority 127} 128 129func (ro *ReadOptions) requestPriority() sppb.RequestOptions_Priority { 130 return ro.Priority 131} 132 133// ReadWithOptions returns a RowIterator for reading multiple rows from the 134// database. Pass a ReadOptions to modify the read operation. 135func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) { 136 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read") 137 defer func() { trace.EndSpan(ctx, ri.err) }() 138 var ( 139 sh *sessionHandle 140 ts *sppb.TransactionSelector 141 err error 142 ) 143 kset, err := keys.keySetProto() 144 if err != nil { 145 return &RowIterator{err: err} 146 } 147 if sh, ts, err = t.acquire(ctx); err != nil { 148 return &RowIterator{err: err} 149 } 150 // Cloud Spanner will return "Session not found" on bad sessions. 151 client := sh.getClient() 152 if client == nil { 153 // Might happen if transaction is closed in the middle of a API call. 154 return &RowIterator{err: errSessionClosed(sh)} 155 } 156 index := "" 157 limit := 0 158 var ro *sppb.RequestOptions 159 if opts != nil { 160 index = opts.Index 161 if opts.Limit > 0 { 162 limit = opts.Limit 163 } 164 ro = createRequestOptions(opts) 165 } 166 return streamWithReplaceSessionFunc( 167 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 168 sh.session.logger, 169 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 170 return client.StreamingRead(ctx, 171 &sppb.ReadRequest{ 172 Session: t.sh.getID(), 173 Transaction: ts, 174 Table: table, 175 Index: index, 176 Columns: columns, 177 KeySet: kset, 178 ResumeToken: resumeToken, 179 Limit: int64(limit), 180 RequestOptions: ro, 181 }) 182 }, 183 t.replaceSessionFunc, 184 t.setTimestamp, 185 t.release, 186 ) 187} 188 189// errRowNotFound returns error for not being able to read the row identified by 190// key. 191func errRowNotFound(table string, key Key) error { 192 return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key) 193} 194 195// errRowNotFoundByIndex returns error for not being able to read the row by index. 196func errRowNotFoundByIndex(table string, key Key, index string) error { 197 return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 198} 199 200// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index. 201func errMultipleRowsFound(table string, key Key, index string) error { 202 return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index) 203} 204 205// ReadRow reads a single row from the database. 206// 207// If no row is present with the given key, then ReadRow returns an error where 208// spanner.ErrCode(err) is codes.NotFound. 209func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) { 210 iter := t.Read(ctx, table, key, columns) 211 defer iter.Stop() 212 row, err := iter.Next() 213 switch err { 214 case iterator.Done: 215 return nil, errRowNotFound(table, key) 216 case nil: 217 return row, nil 218 default: 219 return nil, err 220 } 221} 222 223// ReadRowUsingIndex reads a single row from the database using an index. 224// 225// If no row is present with the given index, then ReadRowUsingIndex returns an 226// error where spanner.ErrCode(err) is codes.NotFound. 227// 228// If more than one row received with the given index, then ReadRowUsingIndex 229// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition. 230func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) { 231 iter := t.ReadUsingIndex(ctx, table, index, key, columns) 232 defer iter.Stop() 233 row, err := iter.Next() 234 switch err { 235 case iterator.Done: 236 return nil, errRowNotFoundByIndex(table, key, index) 237 case nil: 238 // If more than one row found, return an error. 239 _, err := iter.Next() 240 switch err { 241 case iterator.Done: 242 return row, nil 243 case nil: 244 return nil, errMultipleRowsFound(table, key, index) 245 default: 246 return nil, err 247 } 248 default: 249 return nil, err 250 } 251} 252 253// QueryOptions provides options for executing a sql query or update statement. 254type QueryOptions struct { 255 Mode *sppb.ExecuteSqlRequest_QueryMode 256 Options *sppb.ExecuteSqlRequest_QueryOptions 257 258 // Priority is the RPC priority to use for the query/update. 259 Priority sppb.RequestOptions_Priority 260} 261 262func (qo *QueryOptions) requestPriority() sppb.RequestOptions_Priority { 263 return qo.Priority 264} 265 266// merge combines two QueryOptions that the input parameter will have higher 267// order of precedence. 268func (qo QueryOptions) merge(opts QueryOptions) QueryOptions { 269 merged := QueryOptions{ 270 Mode: qo.Mode, 271 Options: &sppb.ExecuteSqlRequest_QueryOptions{}, 272 Priority: qo.Priority, 273 } 274 if opts.Mode != nil { 275 merged.Mode = opts.Mode 276 } 277 if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED { 278 merged.Priority = opts.Priority 279 } 280 proto.Merge(merged.Options, qo.Options) 281 proto.Merge(merged.Options, opts.Options) 282 return merged 283} 284 285func createRequestOptions(prioritizer requestPrioritizer) (ro *sppb.RequestOptions) { 286 if prioritizer == nil { 287 return nil 288 } 289 if prioritizer.requestPriority() != sppb.RequestOptions_PRIORITY_UNSPECIFIED { 290 ro = &sppb.RequestOptions{Priority: prioritizer.requestPriority()} 291 } 292 return ro 293} 294 295// Query executes a query against the database. It returns a RowIterator for 296// retrieving the resulting rows. 297// 298// Query returns only row data, without a query plan or execution statistics. 299// Use QueryWithStats to get rows along with the plan and statistics. Use 300// AnalyzeQuery to get just the plan. 301func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator { 302 mode := sppb.ExecuteSqlRequest_NORMAL 303 return t.query(ctx, statement, QueryOptions{ 304 Mode: &mode, 305 Options: t.qo.Options, 306 }) 307} 308 309// QueryWithOptions executes a SQL statment against the database. It returns 310// a RowIterator for retrieving the resulting rows. The sql query execution 311// will be optimized based on the given query options. 312func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, opts QueryOptions) *RowIterator { 313 return t.query(ctx, statement, t.qo.merge(opts)) 314} 315 316// QueryWithStats executes a SQL statement against the database. It returns 317// a RowIterator for retrieving the resulting rows. The RowIterator will also 318// be populated with a query plan and execution statistics. 319func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator { 320 mode := sppb.ExecuteSqlRequest_PROFILE 321 return t.query(ctx, statement, QueryOptions{ 322 Mode: &mode, 323 Options: t.qo.Options, 324 }) 325} 326 327// AnalyzeQuery returns the query plan for statement. 328func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) { 329 mode := sppb.ExecuteSqlRequest_PLAN 330 iter := t.query(ctx, statement, QueryOptions{ 331 Mode: &mode, 332 Options: t.qo.Options, 333 }) 334 defer iter.Stop() 335 for { 336 _, err := iter.Next() 337 if err == iterator.Done { 338 break 339 } 340 if err != nil { 341 return nil, err 342 } 343 } 344 if iter.QueryPlan == nil { 345 return nil, spannerErrorf(codes.Internal, "query plan unavailable") 346 } 347 return iter.QueryPlan, nil 348} 349 350func (t *txReadOnly) query(ctx context.Context, statement Statement, options QueryOptions) (ri *RowIterator) { 351 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query") 352 defer func() { trace.EndSpan(ctx, ri.err) }() 353 req, sh, err := t.prepareExecuteSQL(ctx, statement, options) 354 if err != nil { 355 return &RowIterator{err: err} 356 } 357 client := sh.getClient() 358 return streamWithReplaceSessionFunc( 359 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 360 sh.session.logger, 361 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 362 req.ResumeToken = resumeToken 363 req.Session = t.sh.getID() 364 return client.ExecuteStreamingSql(ctx, req) 365 }, 366 t.replaceSessionFunc, 367 t.setTimestamp, 368 t.release) 369} 370 371func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, options QueryOptions) (*sppb.ExecuteSqlRequest, *sessionHandle, error) { 372 sh, ts, err := t.acquire(ctx) 373 if err != nil { 374 return nil, nil, err 375 } 376 // Cloud Spanner will return "Session not found" on bad sessions. 377 sid := sh.getID() 378 if sid == "" { 379 // Might happen if transaction is closed in the middle of a API call. 380 return nil, nil, errSessionClosed(sh) 381 } 382 params, paramTypes, err := stmt.convertParams() 383 if err != nil { 384 return nil, nil, err 385 } 386 mode := sppb.ExecuteSqlRequest_NORMAL 387 if options.Mode != nil { 388 mode = *options.Mode 389 } 390 req := &sppb.ExecuteSqlRequest{ 391 Session: sid, 392 Transaction: ts, 393 Sql: stmt.SQL, 394 QueryMode: mode, 395 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 396 Params: params, 397 ParamTypes: paramTypes, 398 QueryOptions: options.Options, 399 RequestOptions: createRequestOptions(&options), 400 } 401 return req, sh, nil 402} 403 404// txState is the status of a transaction. 405type txState int 406 407const ( 408 // transaction is new, waiting to be initialized.. 409 txNew txState = iota 410 // transaction is being initialized. 411 txInit 412 // transaction is active and can perform read/write. 413 txActive 414 // transaction is closed, cannot be used anymore. 415 txClosed 416) 417 418// errRtsUnavailable returns error for read transaction's read timestamp being 419// unavailable. 420func errRtsUnavailable() error { 421 return spannerErrorf(codes.Internal, "read timestamp is unavailable") 422} 423 424// errTxClosed returns error for using a closed transaction. 425func errTxClosed() error { 426 return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction") 427} 428 429// errUnexpectedTxState returns error for transaction enters an unexpected state. 430func errUnexpectedTxState(ts txState) error { 431 return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts) 432} 433 434// ReadOnlyTransaction provides a snapshot transaction with guaranteed 435// consistency across reads, but does not allow writes. Read-only transactions 436// can be configured to read at timestamps in the past. 437// 438// Read-only transactions do not take locks. Instead, they work by choosing a 439// Cloud Spanner timestamp, then executing all reads at that timestamp. Since 440// they do not acquire locks, they do not block concurrent read-write 441// transactions. 442// 443// Unlike locking read-write transactions, read-only transactions never abort. 444// They can fail if the chosen read timestamp is garbage collected; however, the 445// default garbage collection policy is generous enough that most applications 446// do not need to worry about this in practice. See the documentation of 447// TimestampBound for more details. 448// 449// A ReadOnlyTransaction consumes resources on the server until Close is called. 450type ReadOnlyTransaction struct { 451 // mu protects concurrent access to the internal states of ReadOnlyTransaction. 452 mu sync.Mutex 453 // txReadOnly contains methods for performing transactional reads. 454 txReadOnly 455 // singleUse indicates that the transaction can be used for only one read. 456 singleUse bool 457 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 458 // ReadOnlyTransaction. 459 tx transactionID 460 // txReadyOrClosed is for broadcasting that transaction ID has been returned 461 // by Cloud Spanner or that transaction is closed. 462 txReadyOrClosed chan struct{} 463 // state is the current transaction status of the ReadOnly transaction. 464 state txState 465 // rts is the read timestamp returned by transactional reads. 466 rts time.Time 467 // tb is the read staleness bound specification for transactional reads. 468 tb TimestampBound 469} 470 471// errTxInitTimeout returns error for timeout in waiting for initialization of 472// the transaction. 473func errTxInitTimeout() error { 474 return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization") 475} 476 477// getTimestampBound returns the read staleness bound specified for the 478// ReadOnlyTransaction. 479func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound { 480 t.mu.Lock() 481 defer t.mu.Unlock() 482 return t.tb 483} 484 485// begin starts a snapshot read-only Transaction on Cloud Spanner. 486func (t *ReadOnlyTransaction) begin(ctx context.Context) error { 487 var ( 488 locked bool 489 tx transactionID 490 rts time.Time 491 sh *sessionHandle 492 err error 493 res *sppb.Transaction 494 ) 495 defer func() { 496 if !locked { 497 t.mu.Lock() 498 // Not necessary, just to make it clear that t.mu is being held when 499 // locked == true. 500 locked = true 501 } 502 if t.state != txClosed { 503 // Signal other initialization routines. 504 close(t.txReadyOrClosed) 505 t.txReadyOrClosed = make(chan struct{}) 506 } 507 t.mu.Unlock() 508 if err != nil && sh != nil { 509 // Got a valid session handle, but failed to initialize transaction= 510 // on Cloud Spanner. 511 if isSessionNotFoundError(err) { 512 sh.destroy() 513 } 514 // If sh.destroy was already executed, this becomes a noop. 515 sh.recycle() 516 } 517 }() 518 // Retry the BeginTransaction call if a 'Session not found' is returned. 519 for { 520 sh, err = t.sp.take(ctx) 521 if err != nil { 522 return err 523 } 524 res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ 525 Session: sh.getID(), 526 Options: &sppb.TransactionOptions{ 527 Mode: &sppb.TransactionOptions_ReadOnly_{ 528 ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true), 529 }, 530 }, 531 }) 532 if isSessionNotFoundError(err) { 533 sh.destroy() 534 continue 535 } else if err == nil { 536 tx = res.Id 537 if res.ReadTimestamp != nil { 538 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 539 } 540 } else { 541 err = ToSpannerError(err) 542 } 543 break 544 } 545 t.mu.Lock() 546 547 // defer function will be executed with t.mu being held. 548 locked = true 549 550 // During the execution of t.begin(), t.Close() was invoked. 551 if t.state == txClosed { 552 return errSessionClosed(sh) 553 } 554 555 // If begin() fails, this allows other queries to take over the 556 // initialization. 557 t.tx = nil 558 if err == nil { 559 t.tx = tx 560 t.rts = rts 561 t.sh = sh 562 // State transite to txActive. 563 t.state = txActive 564 } 565 return err 566} 567 568// acquire implements txReadEnv.acquire. 569func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 570 if err := checkNestedTxn(ctx); err != nil { 571 return nil, nil, err 572 } 573 if t.singleUse { 574 return t.acquireSingleUse(ctx) 575 } 576 return t.acquireMultiUse(ctx) 577} 578 579func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 580 t.mu.Lock() 581 defer t.mu.Unlock() 582 switch t.state { 583 case txClosed: 584 // A closed single-use transaction can never be reused. 585 return nil, nil, errTxClosed() 586 case txNew: 587 t.state = txClosed 588 ts := &sppb.TransactionSelector{ 589 Selector: &sppb.TransactionSelector_SingleUse{ 590 SingleUse: &sppb.TransactionOptions{ 591 Mode: &sppb.TransactionOptions_ReadOnly_{ 592 ReadOnly: buildTransactionOptionsReadOnly(t.tb, true), 593 }, 594 }, 595 }, 596 } 597 sh, err := t.sp.take(ctx) 598 if err != nil { 599 return nil, nil, err 600 } 601 602 // Install session handle into t, which can be used for readonly 603 // operations later. 604 t.sh = sh 605 return sh, ts, nil 606 } 607 us := t.state 608 609 // SingleUse transaction should only be in either txNew state or txClosed 610 // state. 611 return nil, nil, errUnexpectedTxState(us) 612} 613 614func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 615 for { 616 t.mu.Lock() 617 switch t.state { 618 case txClosed: 619 t.mu.Unlock() 620 return nil, nil, errTxClosed() 621 case txNew: 622 // State transit to txInit so that no further TimestampBound change 623 // is accepted. 624 t.state = txInit 625 t.mu.Unlock() 626 continue 627 case txInit: 628 if t.tx != nil { 629 // Wait for a transaction ID to become ready. 630 txReadyOrClosed := t.txReadyOrClosed 631 t.mu.Unlock() 632 select { 633 case <-txReadyOrClosed: 634 // Need to check transaction state again. 635 continue 636 case <-ctx.Done(): 637 // The waiting for initialization is timeout, return error 638 // directly. 639 return nil, nil, errTxInitTimeout() 640 } 641 } 642 // Take the ownership of initializing the transaction. 643 t.tx = transactionID{} 644 t.mu.Unlock() 645 // Begin a read-only transaction. 646 // 647 // TODO: consider adding a transaction option which allow queries to 648 // initiate transactions by themselves. Note that this option might 649 // not be always good because the ID of the new transaction won't 650 // be ready till the query returns some data or completes. 651 if err := t.begin(ctx); err != nil { 652 return nil, nil, err 653 } 654 655 // If t.begin() succeeded, t.state should have been changed to 656 // txActive, so we can just continue here. 657 continue 658 case txActive: 659 sh := t.sh 660 ts := &sppb.TransactionSelector{ 661 Selector: &sppb.TransactionSelector_Id{ 662 Id: t.tx, 663 }, 664 } 665 t.mu.Unlock() 666 return sh, ts, nil 667 } 668 state := t.state 669 t.mu.Unlock() 670 return nil, nil, errUnexpectedTxState(state) 671 } 672} 673 674func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) { 675 t.mu.Lock() 676 defer t.mu.Unlock() 677 if t.rts.IsZero() { 678 t.rts = ts 679 } 680} 681 682// release implements txReadEnv.release. 683func (t *ReadOnlyTransaction) release(err error) { 684 t.mu.Lock() 685 sh := t.sh 686 t.mu.Unlock() 687 if sh != nil { // sh could be nil if t.acquire() fails. 688 if isSessionNotFoundError(err) { 689 sh.destroy() 690 } 691 if t.singleUse { 692 // If session handle is already destroyed, this becomes a noop. 693 sh.recycle() 694 } 695 } 696} 697 698// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads 699// after being closed. 700func (t *ReadOnlyTransaction) Close() { 701 if t.singleUse { 702 return 703 } 704 t.mu.Lock() 705 if t.state != txClosed { 706 t.state = txClosed 707 close(t.txReadyOrClosed) 708 } 709 sh := t.sh 710 t.mu.Unlock() 711 if sh == nil { 712 return 713 } 714 // If session handle is already destroyed, this becomes a noop. If there are 715 // still active queries and if the recycled session is reused before they 716 // complete, Cloud Spanner will cancel them on behalf of the new transaction 717 // on the session. 718 if sh != nil { 719 sh.recycle() 720 } 721} 722 723// Timestamp returns the timestamp chosen to perform reads and queries in this 724// transaction. The value can only be read after some read or query has either 725// returned some data or completed without returning any data. 726func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) { 727 t.mu.Lock() 728 defer t.mu.Unlock() 729 if t.rts.IsZero() { 730 return t.rts, errRtsUnavailable() 731 } 732 return t.rts, nil 733} 734 735// WithTimestampBound specifies the TimestampBound to use for read or query. 736// This can only be used before the first read or query is invoked. Note: 737// bounded staleness is not available with general ReadOnlyTransactions; use a 738// single-use ReadOnlyTransaction instead. 739// 740// The returned value is the ReadOnlyTransaction so calls can be chained. 741func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction { 742 t.mu.Lock() 743 defer t.mu.Unlock() 744 if t.state == txNew { 745 // Only allow to set TimestampBound before the first query. 746 t.tb = tb 747 } 748 return t 749} 750 751// ReadWriteTransaction provides a locking read-write transaction. 752// 753// This type of transaction is the only way to write data into Cloud Spanner; 754// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use 755// transactions internally. These transactions rely on pessimistic locking and, 756// if necessary, two-phase commit. Locking read-write transactions may abort, 757// requiring the application to retry. However, the interface exposed by 758// (*Client).ReadWriteTransaction eliminates the need for applications to write 759// retry loops explicitly. 760// 761// Locking transactions may be used to atomically read-modify-write data 762// anywhere in a database. This type of transaction is externally consistent. 763// 764// Clients should attempt to minimize the amount of time a transaction is 765// active. Faster transactions commit with higher probability and cause less 766// contention. Cloud Spanner attempts to keep read locks active as long as the 767// transaction continues to do reads. Long periods of inactivity at the client 768// may cause Cloud Spanner to release a transaction's locks and abort it. 769// 770// Reads performed within a transaction acquire locks on the data being 771// read. Writes can only be done at commit time, after all reads have been 772// completed. Conceptually, a read-write transaction consists of zero or more 773// reads or SQL queries followed by a commit. 774// 775// See (*Client).ReadWriteTransaction for an example. 776// 777// Semantics 778// 779// Cloud Spanner can commit the transaction if all read locks it acquired are 780// still valid at commit time, and it is able to acquire write locks for all 781// writes. Cloud Spanner can abort the transaction for any reason. If a commit 782// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has 783// not modified any user data in Cloud Spanner. 784// 785// Unless the transaction commits, Cloud Spanner makes no guarantees about how 786// long the transaction's locks were held for. It is an error to use Cloud 787// Spanner locks for any sort of mutual exclusion other than between Cloud 788// Spanner transactions themselves. 789// 790// Aborted transactions 791// 792// Application code does not need to retry explicitly; RunInTransaction will 793// automatically retry a transaction if an attempt results in an abort. The lock 794// priority of a transaction increases after each prior aborted transaction, 795// meaning that the next attempt has a slightly better chance of success than 796// before. 797// 798// Under some circumstances (e.g., many transactions attempting to modify the 799// same row(s)), a transaction can abort many times in a short period before 800// successfully committing. Thus, it is not a good idea to cap the number of 801// retries a transaction can attempt; instead, it is better to limit the total 802// amount of wall time spent retrying. 803// 804// Idle transactions 805// 806// A transaction is considered idle if it has no outstanding reads or SQL 807// queries and has not started a read or SQL query within the last 10 808// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't 809// hold on to locks indefinitely. In that case, the commit will fail with error 810// ABORTED. 811// 812// If this behavior is undesirable, periodically executing a simple SQL query 813// in the transaction (e.g., SELECT 1) prevents the transaction from becoming 814// idle. 815type ReadWriteTransaction struct { 816 // txReadOnly contains methods for performing transactional reads. 817 txReadOnly 818 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 819 // ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin() 820 // during the initialization of ReadWriteTransaction. 821 tx transactionID 822 // mu protects concurrent access to the internal states of 823 // ReadWriteTransaction. 824 mu sync.Mutex 825 // state is the current transaction status of the read-write transaction. 826 state txState 827 // wb is the set of buffered mutations waiting to be committed. 828 wb []*Mutation 829} 830 831// BufferWrite adds a list of mutations to the set of updates that will be 832// applied when the transaction is committed. It does not actually apply the 833// write until the transaction is committed, so the operation does not block. 834// The effects of the write won't be visible to any reads (including reads done 835// in the same transaction) until the transaction commits. 836// 837// See the example for Client.ReadWriteTransaction. 838func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error { 839 t.mu.Lock() 840 defer t.mu.Unlock() 841 if t.state == txClosed { 842 return errTxClosed() 843 } 844 if t.state != txActive { 845 return errUnexpectedTxState(t.state) 846 } 847 t.wb = append(t.wb, ms...) 848 return nil 849} 850 851// Update executes a DML statement against the database. It returns the number 852// of affected rows. Update returns an error if the statement is a query. 853// However, the query is executed, and any data read will be validated upon 854// commit. 855func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) { 856 mode := sppb.ExecuteSqlRequest_NORMAL 857 return t.update(ctx, stmt, QueryOptions{ 858 Mode: &mode, 859 Options: t.qo.Options, 860 }) 861} 862 863// UpdateWithOptions executes a DML statement against the database. It returns 864// the number of affected rows. The given QueryOptions will be used for the 865// execution of this statement. 866func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { 867 return t.update(ctx, stmt, t.qo.merge(opts)) 868} 869 870func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) { 871 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update") 872 defer func() { trace.EndSpan(ctx, err) }() 873 req, sh, err := t.prepareExecuteSQL(ctx, stmt, opts) 874 if err != nil { 875 return 0, err 876 } 877 resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req) 878 if err != nil { 879 return 0, ToSpannerError(err) 880 } 881 if resultSet.Stats == nil { 882 return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL) 883 } 884 return extractRowCount(resultSet.Stats) 885} 886 887// BatchUpdate groups one or more DML statements and sends them to Spanner in a 888// single RPC. This is an efficient way to execute multiple DML statements. 889// 890// A slice of counts is returned, where each count represents the number of 891// affected rows for the given query at the same index. If an error occurs, 892// counts will be returned up to the query that encountered the error. 893func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) { 894 return t.BatchUpdateWithOptions(ctx, stmts, QueryOptions{}) 895} 896 897// BatchUpdateWithOptions groups one or more DML statements and sends them to 898// Spanner in a single RPC. This is an efficient way to execute multiple DML 899// statements. 900// 901// A slice of counts is returned, where each count represents the number of 902// affected rows for the given query at the same index. If an error occurs, 903// counts will be returned up to the query that encountered the error. 904// 905// The priority given in the QueryOptions will be included with the RPC. 906// Any other options that are set in the QueryOptions struct will be ignored. 907func (t *ReadWriteTransaction) BatchUpdateWithOptions(ctx context.Context, stmts []Statement, opts QueryOptions) (_ []int64, err error) { 908 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate") 909 defer func() { trace.EndSpan(ctx, err) }() 910 911 sh, ts, err := t.acquire(ctx) 912 if err != nil { 913 return nil, err 914 } 915 // Cloud Spanner will return "Session not found" on bad sessions. 916 sid := sh.getID() 917 if sid == "" { 918 // Might happen if transaction is closed in the middle of a API call. 919 return nil, errSessionClosed(sh) 920 } 921 922 var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement 923 for _, st := range stmts { 924 params, paramTypes, err := st.convertParams() 925 if err != nil { 926 return nil, err 927 } 928 sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{ 929 Sql: st.SQL, 930 Params: params, 931 ParamTypes: paramTypes, 932 }) 933 } 934 935 resp, err := sh.getClient().ExecuteBatchDml(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.ExecuteBatchDmlRequest{ 936 Session: sh.getID(), 937 Transaction: ts, 938 Statements: sppbStmts, 939 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 940 RequestOptions: createRequestOptions(&opts), 941 }) 942 if err != nil { 943 return nil, ToSpannerError(err) 944 } 945 946 var counts []int64 947 for _, rs := range resp.ResultSets { 948 count, err := extractRowCount(rs.Stats) 949 if err != nil { 950 return nil, err 951 } 952 counts = append(counts, count) 953 } 954 if resp.Status != nil && resp.Status.Code != 0 { 955 return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message) 956 } 957 return counts, nil 958} 959 960// acquire implements txReadEnv.acquire. 961func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 962 ts := &sppb.TransactionSelector{ 963 Selector: &sppb.TransactionSelector_Id{ 964 Id: t.tx, 965 }, 966 } 967 t.mu.Lock() 968 defer t.mu.Unlock() 969 switch t.state { 970 case txClosed: 971 return nil, nil, errTxClosed() 972 case txActive: 973 return t.sh, ts, nil 974 } 975 return nil, nil, errUnexpectedTxState(t.state) 976} 977 978// release implements txReadEnv.release. 979func (t *ReadWriteTransaction) release(err error) { 980 t.mu.Lock() 981 sh := t.sh 982 t.mu.Unlock() 983 if sh != nil && isSessionNotFoundError(err) { 984 sh.destroy() 985 } 986} 987 988func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) { 989 res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{ 990 Session: sid, 991 Options: &sppb.TransactionOptions{ 992 Mode: &sppb.TransactionOptions_ReadWrite_{ 993 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 994 }, 995 }, 996 }) 997 if err != nil { 998 return nil, err 999 } 1000 if res.Id == nil { 1001 return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.") 1002 } 1003 return res.Id, nil 1004} 1005 1006// begin starts a read-write transacton on Cloud Spanner, it is always called 1007// before any of the public APIs. 1008func (t *ReadWriteTransaction) begin(ctx context.Context) error { 1009 if t.tx != nil { 1010 t.state = txActive 1011 return nil 1012 } 1013 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) 1014 if err == nil { 1015 t.tx = tx 1016 t.state = txActive 1017 return nil 1018 } 1019 if isSessionNotFoundError(err) { 1020 t.sh.destroy() 1021 } 1022 return err 1023} 1024 1025// CommitResponse provides a response of a transaction commit in a database. 1026type CommitResponse struct { 1027 // CommitTs is the commit time for a transaction. 1028 CommitTs time.Time 1029 // CommitStats is the commit statistics for a transaction. 1030 CommitStats *sppb.CommitResponse_CommitStats 1031} 1032 1033// CommitOptions provides options for commiting a transaction in a database. 1034type CommitOptions struct { 1035 ReturnCommitStats bool 1036} 1037 1038// commit tries to commit a readwrite transaction to Cloud Spanner. It also 1039// returns the commit response for the transactions. 1040func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions) (CommitResponse, error) { 1041 resp := CommitResponse{} 1042 t.mu.Lock() 1043 t.state = txClosed // No further operations after commit. 1044 mPb, err := mutationsProto(t.wb) 1045 t.mu.Unlock() 1046 if err != nil { 1047 return resp, err 1048 } 1049 1050 // In case that sessionHandle was destroyed but transaction body fails to 1051 // report it. 1052 sid, client := t.sh.getID(), t.sh.getClient() 1053 if sid == "" || client == nil { 1054 return resp, errSessionClosed(t.sh) 1055 } 1056 1057 res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{ 1058 Session: sid, 1059 Transaction: &sppb.CommitRequest_TransactionId{ 1060 TransactionId: t.tx, 1061 }, 1062 RequestOptions: createRequestOptions(&t.txOpts), 1063 Mutations: mPb, 1064 ReturnCommitStats: options.ReturnCommitStats, 1065 }) 1066 if e != nil { 1067 return resp, toSpannerErrorWithCommitInfo(e, true) 1068 } 1069 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 1070 resp.CommitTs = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 1071 } 1072 if options.ReturnCommitStats { 1073 resp.CommitStats = res.CommitStats 1074 } 1075 if isSessionNotFoundError(err) { 1076 t.sh.destroy() 1077 } 1078 return resp, err 1079} 1080 1081// rollback is called when a commit is aborted or the transaction body runs 1082// into error. 1083func (t *ReadWriteTransaction) rollback(ctx context.Context) { 1084 t.mu.Lock() 1085 // Forbid further operations on rollbacked transaction. 1086 t.state = txClosed 1087 t.mu.Unlock() 1088 // In case that sessionHandle was destroyed but transaction body fails to 1089 // report it. 1090 sid, client := t.sh.getID(), t.sh.getClient() 1091 if sid == "" || client == nil { 1092 return 1093 } 1094 err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{ 1095 Session: sid, 1096 TransactionId: t.tx, 1097 }) 1098 if isSessionNotFoundError(err) { 1099 t.sh.destroy() 1100 } 1101} 1102 1103// runInTransaction executes f under a read-write transaction context. 1104func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (CommitResponse, error) { 1105 var ( 1106 resp CommitResponse 1107 err error 1108 errDuringCommit bool 1109 ) 1110 if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil { 1111 // Try to commit if transaction body returns no error. 1112 resp, err = t.commit(ctx, t.txOpts.CommitOptions) 1113 errDuringCommit = err != nil 1114 } 1115 if err != nil { 1116 if isAbortedErr(err) { 1117 // Retry the transaction using the same session on ABORT error. 1118 // Cloud Spanner will create the new transaction with the previous 1119 // one's wound-wait priority. 1120 return resp, err 1121 } 1122 if isSessionNotFoundError(err) { 1123 t.sh.destroy() 1124 return resp, err 1125 } 1126 // Rollback the transaction unless the error occurred during the 1127 // commit. Executing a rollback after a commit has failed will 1128 // otherwise cause an error. Note that transient errors, such as 1129 // UNAVAILABLE, are already handled in the gRPC layer and do not show 1130 // up here. Context errors (deadline exceeded / canceled) during 1131 // commits are also not rolled back. 1132 if !errDuringCommit { 1133 t.rollback(ctx) 1134 } 1135 return resp, err 1136 } 1137 // err == nil, return commit response. 1138 return resp, nil 1139} 1140 1141// ReadWriteStmtBasedTransaction provides a wrapper of ReadWriteTransaction in 1142// order to run a read-write transaction in a statement-based way. 1143// 1144// This struct is returned by NewReadWriteStmtBasedTransaction and contains 1145// Commit() and Rollback() methods to end a transaction. 1146type ReadWriteStmtBasedTransaction struct { 1147 // ReadWriteTransaction contains methods for performing transactional reads. 1148 ReadWriteTransaction 1149 1150 options TransactionOptions 1151} 1152 1153// NewReadWriteStmtBasedTransaction starts a read-write transaction. Commit() or 1154// Rollback() must be called to end a transaction. If Commit() or Rollback() is 1155// not called, the session that is used by the transaction will not be returned 1156// to the pool and cause a session leak. 1157// 1158// This method should only be used when manual error handling and retry 1159// management is needed. Cloud Spanner may abort a read/write transaction at any 1160// moment, and each statement that is executed on the transaction should be 1161// checked for an Aborted error, including queries and read operations. 1162// 1163// For most use cases, client.ReadWriteTransaction should be used, as it will 1164// handle all Aborted and 'Session not found' errors automatically. 1165func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWriteStmtBasedTransaction, error) { 1166 return NewReadWriteStmtBasedTransactionWithOptions(ctx, c, TransactionOptions{}) 1167} 1168 1169// NewReadWriteStmtBasedTransactionWithOptions starts a read-write transaction 1170// with configurable options. Commit() or Rollback() must be called to end a 1171// transaction. If Commit() or Rollback() is not called, the session that is 1172// used by the transaction will not be returned to the pool and cause a session 1173// leak. 1174// 1175// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of 1176// NewReadWriteStmtBasedTransaction. 1177func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) { 1178 var ( 1179 sh *sessionHandle 1180 err error 1181 t *ReadWriteStmtBasedTransaction 1182 ) 1183 sh, err = c.idleSessions.takeWriteSession(ctx) 1184 if err != nil { 1185 // If session retrieval fails, just fail the transaction. 1186 return nil, err 1187 } 1188 t = &ReadWriteStmtBasedTransaction{ 1189 ReadWriteTransaction: ReadWriteTransaction{ 1190 tx: sh.getTransactionID(), 1191 }, 1192 } 1193 t.txReadOnly.sh = sh 1194 t.txReadOnly.txReadEnv = t 1195 t.txReadOnly.qo = c.qo 1196 t.txOpts = options 1197 1198 if err = t.begin(ctx); err != nil { 1199 if sh != nil { 1200 sh.recycle() 1201 } 1202 return nil, err 1203 } 1204 return t, err 1205} 1206 1207// Commit tries to commit a readwrite transaction to Cloud Spanner. It also 1208// returns the commit timestamp for the transactions. 1209func (t *ReadWriteStmtBasedTransaction) Commit(ctx context.Context) (time.Time, error) { 1210 resp, err := t.CommitWithReturnResp(ctx) 1211 return resp.CommitTs, err 1212} 1213 1214// CommitWithReturnResp tries to commit a readwrite transaction. It also returns 1215// the commit timestamp and stats for the transactions. 1216func (t *ReadWriteStmtBasedTransaction) CommitWithReturnResp(ctx context.Context) (CommitResponse, error) { 1217 resp, err := t.commit(ctx, t.txOpts.CommitOptions) 1218 // Rolling back an aborted transaction is not necessary. 1219 if err != nil && status.Code(err) != codes.Aborted { 1220 t.rollback(ctx) 1221 } 1222 if t.sh != nil { 1223 t.sh.recycle() 1224 } 1225 return resp, err 1226} 1227 1228// Rollback is called to cancel the ongoing transaction that has not been 1229// committed yet. 1230func (t *ReadWriteStmtBasedTransaction) Rollback(ctx context.Context) { 1231 t.rollback(ctx) 1232 if t.sh != nil { 1233 t.sh.recycle() 1234 } 1235} 1236 1237// writeOnlyTransaction provides the most efficient way of doing write-only 1238// transactions. It essentially does blind writes to Cloud Spanner. 1239type writeOnlyTransaction struct { 1240 // sp is the session pool which writeOnlyTransaction uses to get Cloud 1241 // Spanner sessions for blind writes. 1242 sp *sessionPool 1243 // commitPriority is the RPC priority to use for the commit operation. 1244 commitPriority sppb.RequestOptions_Priority 1245} 1246 1247func (t *writeOnlyTransaction) requestPriority() sppb.RequestOptions_Priority { 1248 return t.commitPriority 1249} 1250 1251// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, 1252// unless one of the following happens: 1253// 1254// 1) Context times out. 1255// 2) An unretryable error (e.g. database not found) occurs. 1256// 3) There is a malformed Mutation object. 1257func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) { 1258 var ( 1259 ts time.Time 1260 sh *sessionHandle 1261 ) 1262 mPb, err := mutationsProto(ms) 1263 if err != nil { 1264 // Malformed mutation found, just return the error. 1265 return ts, err 1266 } 1267 1268 // Retry-loop for aborted transactions. 1269 // TODO: Replace with generic retryer. 1270 for { 1271 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 1272 // No usable session for doing the commit, take one from pool. 1273 sh, err = t.sp.take(ctx) 1274 if err != nil { 1275 // sessionPool.Take already retries for session 1276 // creations/retrivals. 1277 return ts, err 1278 } 1279 defer sh.recycle() 1280 } 1281 res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ 1282 Session: sh.getID(), 1283 Transaction: &sppb.CommitRequest_SingleUseTransaction{ 1284 SingleUseTransaction: &sppb.TransactionOptions{ 1285 Mode: &sppb.TransactionOptions_ReadWrite_{ 1286 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 1287 }, 1288 }, 1289 }, 1290 Mutations: mPb, 1291 RequestOptions: createRequestOptions(t), 1292 }) 1293 if err != nil && !isAbortedErr(err) { 1294 if isSessionNotFoundError(err) { 1295 // Discard the bad session. 1296 sh.destroy() 1297 } 1298 return ts, toSpannerErrorWithCommitInfo(err, true) 1299 } else if err == nil { 1300 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 1301 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 1302 } 1303 break 1304 } 1305 } 1306 return ts, ToSpannerError(err) 1307} 1308 1309// isAbortedErr returns true if the error indicates that an gRPC call is 1310// aborted on the server side. 1311func isAbortedErr(err error) bool { 1312 if err == nil { 1313 return false 1314 } 1315 if ErrCode(err) == codes.Aborted { 1316 return true 1317 } 1318 return false 1319} 1320