1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "cloud.google.com/go/internal/trace" 26 "google.golang.org/api/iterator" 27 sppb "google.golang.org/genproto/googleapis/spanner/v1" 28 "google.golang.org/grpc" 29 "google.golang.org/grpc/codes" 30 "google.golang.org/grpc/metadata" 31) 32 33// transactionID stores a transaction ID which uniquely identifies a transaction 34// in Cloud Spanner. 35type transactionID []byte 36 37// txReadEnv manages a read-transaction environment consisting of a session 38// handle and a transaction selector. 39type txReadEnv interface { 40 // acquire returns a read-transaction environment that can be used to 41 // perform a transactional read. 42 acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) 43 // sets the transaction's read timestamp 44 setTimestamp(time.Time) 45 // release should be called at the end of every transactional read to deal 46 // with session recycling. 47 release(error) 48} 49 50// txReadOnly contains methods for doing transactional reads. 51type txReadOnly struct { 52 // read-transaction environment for performing transactional read 53 // operations. 54 txReadEnv 55 56 // Atomic. Only needed for DML statements, but used forall. 57 sequenceNumber int64 58} 59 60// errSessionClosed returns error for using a recycled/destroyed session 61func errSessionClosed(sh *sessionHandle) error { 62 return spannerErrorf(codes.FailedPrecondition, 63 "session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient()) 64} 65 66// Read returns a RowIterator for reading multiple rows from the database. 67func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator { 68 return t.ReadWithOptions(ctx, table, keys, columns, nil) 69} 70 71// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}. 72func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) { 73 return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index}) 74} 75 76// ReadOptions provides options for reading rows from a database. 77type ReadOptions struct { 78 // The index to use for reading. If non-empty, you can only read columns 79 // that are part of the index key, part of the primary key, or stored in the 80 // index due to a STORING clause in the index definition. 81 Index string 82 83 // The maximum number of rows to read. A limit value less than 1 means no 84 // limit. 85 Limit int 86} 87 88// ReadWithOptions returns a RowIterator for reading multiple rows from the 89// database. Pass a ReadOptions to modify the read operation. 90func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) { 91 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read") 92 defer func() { trace.EndSpan(ctx, ri.err) }() 93 var ( 94 sh *sessionHandle 95 ts *sppb.TransactionSelector 96 err error 97 ) 98 kset, err := keys.keySetProto() 99 if err != nil { 100 return &RowIterator{err: err} 101 } 102 if sh, ts, err = t.acquire(ctx); err != nil { 103 return &RowIterator{err: err} 104 } 105 // Cloud Spanner will return "Session not found" on bad sessions. 106 sid, client := sh.getID(), sh.getClient() 107 if sid == "" || client == nil { 108 // Might happen if transaction is closed in the middle of a API call. 109 return &RowIterator{err: errSessionClosed(sh)} 110 } 111 index := "" 112 limit := 0 113 if opts != nil { 114 index = opts.Index 115 if opts.Limit > 0 { 116 limit = opts.Limit 117 } 118 } 119 return stream( 120 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 121 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 122 return client.StreamingRead(ctx, 123 &sppb.ReadRequest{ 124 Session: sid, 125 Transaction: ts, 126 Table: table, 127 Index: index, 128 Columns: columns, 129 KeySet: kset, 130 ResumeToken: resumeToken, 131 Limit: int64(limit), 132 }) 133 }, 134 t.setTimestamp, 135 t.release, 136 ) 137} 138 139// errRowNotFound returns error for not being able to read the row identified by 140// key. 141func errRowNotFound(table string, key Key) error { 142 return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key) 143} 144 145// ReadRow reads a single row from the database. 146// 147// If no row is present with the given key, then ReadRow returns an error where 148// spanner.ErrCode(err) is codes.NotFound. 149func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) { 150 iter := t.Read(ctx, table, key, columns) 151 defer iter.Stop() 152 row, err := iter.Next() 153 switch err { 154 case iterator.Done: 155 return nil, errRowNotFound(table, key) 156 case nil: 157 return row, nil 158 default: 159 return nil, err 160 } 161} 162 163// Query executes a query against the database. It returns a RowIterator for 164// retrieving the resulting rows. 165// 166// Query returns only row data, without a query plan or execution statistics. 167// Use QueryWithStats to get rows along with the plan and statistics. Use 168// AnalyzeQuery to get just the plan. 169func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator { 170 return t.query(ctx, statement, sppb.ExecuteSqlRequest_NORMAL) 171} 172 173// Query executes a SQL statement against the database. It returns a RowIterator 174// for retrieving the resulting rows. The RowIterator will also be populated 175// with a query plan and execution statistics. 176func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator { 177 return t.query(ctx, statement, sppb.ExecuteSqlRequest_PROFILE) 178} 179 180// AnalyzeQuery returns the query plan for statement. 181func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) { 182 iter := t.query(ctx, statement, sppb.ExecuteSqlRequest_PLAN) 183 defer iter.Stop() 184 for { 185 _, err := iter.Next() 186 if err == iterator.Done { 187 break 188 } 189 if err != nil { 190 return nil, err 191 } 192 } 193 if iter.QueryPlan == nil { 194 return nil, spannerErrorf(codes.Internal, "query plan unavailable") 195 } 196 return iter.QueryPlan, nil 197} 198 199func (t *txReadOnly) query(ctx context.Context, statement Statement, mode sppb.ExecuteSqlRequest_QueryMode) (ri *RowIterator) { 200 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query") 201 defer func() { trace.EndSpan(ctx, ri.err) }() 202 req, sh, err := t.prepareExecuteSQL(ctx, statement, mode) 203 if err != nil { 204 return &RowIterator{err: err} 205 } 206 client := sh.getClient() 207 return stream( 208 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 209 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 210 req.ResumeToken = resumeToken 211 return client.ExecuteStreamingSql(ctx, req) 212 }, 213 t.setTimestamp, 214 t.release) 215} 216 217func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, mode sppb.ExecuteSqlRequest_QueryMode) (*sppb.ExecuteSqlRequest, *sessionHandle, error) { 218 sh, ts, err := t.acquire(ctx) 219 if err != nil { 220 return nil, nil, err 221 } 222 // Cloud Spanner will return "Session not found" on bad sessions. 223 sid := sh.getID() 224 if sid == "" { 225 // Might happen if transaction is closed in the middle of a API call. 226 return nil, nil, errSessionClosed(sh) 227 } 228 params, paramTypes, err := stmt.convertParams() 229 if err != nil { 230 return nil, nil, err 231 } 232 req := &sppb.ExecuteSqlRequest{ 233 Session: sid, 234 Transaction: ts, 235 Sql: stmt.SQL, 236 QueryMode: mode, 237 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 238 Params: params, 239 ParamTypes: paramTypes, 240 } 241 return req, sh, nil 242} 243 244// txState is the status of a transaction. 245type txState int 246 247const ( 248 // transaction is new, waiting to be initialized.. 249 txNew txState = iota 250 // transaction is being initialized. 251 txInit 252 // transaction is active and can perform read/write. 253 txActive 254 // transaction is closed, cannot be used anymore. 255 txClosed 256) 257 258// errRtsUnavailable returns error for read transaction's read timestamp being 259// unavailable. 260func errRtsUnavailable() error { 261 return spannerErrorf(codes.Internal, "read timestamp is unavailable") 262} 263 264// errTxClosed returns error for using a closed transaction. 265func errTxClosed() error { 266 return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction") 267} 268 269// errUnexpectedTxState returns error for transaction enters an unexpected state. 270func errUnexpectedTxState(ts txState) error { 271 return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts) 272} 273 274// ReadOnlyTransaction provides a snapshot transaction with guaranteed 275// consistency across reads, but does not allow writes. Read-only transactions 276// can be configured to read at timestamps in the past. 277// 278// Read-only transactions do not take locks. Instead, they work by choosing a 279// Cloud Spanner timestamp, then executing all reads at that timestamp. Since 280// they do not acquire locks, they do not block concurrent read-write 281// transactions. 282// 283// Unlike locking read-write transactions, read-only transactions never abort. 284// They can fail if the chosen read timestamp is garbage collected; however, the 285// default garbage collection policy is generous enough that most applications 286// do not need to worry about this in practice. See the documentation of 287// TimestampBound for more details. 288// 289// A ReadOnlyTransaction consumes resources on the server until Close is called. 290type ReadOnlyTransaction struct { 291 // mu protects concurrent access to the internal states of ReadOnlyTransaction. 292 mu sync.Mutex 293 // txReadOnly contains methods for performing transactional reads. 294 txReadOnly 295 // singleUse indicates that the transaction can be used for only one read. 296 singleUse bool 297 // sp is the session pool for allocating a session to execute the read-only 298 // transaction. It is set only once during initialization of the 299 // ReadOnlyTransaction. 300 sp *sessionPool 301 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 302 // ReadOnlyTransaction. 303 tx transactionID 304 // txReadyOrClosed is for broadcasting that transaction ID has been returned 305 // by Cloud Spanner or that transaction is closed. 306 txReadyOrClosed chan struct{} 307 // state is the current transaction status of the ReadOnly transaction. 308 state txState 309 // sh is the sessionHandle allocated from sp. 310 sh *sessionHandle 311 // rts is the read timestamp returned by transactional reads. 312 rts time.Time 313 // tb is the read staleness bound specification for transactional reads. 314 tb TimestampBound 315} 316 317// errTxInitTimeout returns error for timeout in waiting for initialization of 318// the transaction. 319func errTxInitTimeout() error { 320 return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization") 321} 322 323// getTimestampBound returns the read staleness bound specified for the 324// ReadOnlyTransaction. 325func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound { 326 t.mu.Lock() 327 defer t.mu.Unlock() 328 return t.tb 329} 330 331// begin starts a snapshot read-only Transaction on Cloud Spanner. 332func (t *ReadOnlyTransaction) begin(ctx context.Context) error { 333 var ( 334 locked bool 335 tx transactionID 336 rts time.Time 337 sh *sessionHandle 338 err error 339 ) 340 defer func() { 341 if !locked { 342 t.mu.Lock() 343 // Not necessary, just to make it clear that t.mu is being held when 344 // locked == true. 345 locked = true 346 } 347 if t.state != txClosed { 348 // Signal other initialization routines. 349 close(t.txReadyOrClosed) 350 t.txReadyOrClosed = make(chan struct{}) 351 } 352 t.mu.Unlock() 353 if err != nil && sh != nil { 354 // Got a valid session handle, but failed to initialize transaction= 355 // on Cloud Spanner. 356 if shouldDropSession(err) { 357 sh.destroy() 358 } 359 // If sh.destroy was already executed, this becomes a noop. 360 sh.recycle() 361 } 362 }() 363 sh, err = t.sp.take(ctx) 364 if err != nil { 365 return err 366 } 367 err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error { 368 res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{ 369 Session: sh.getID(), 370 Options: &sppb.TransactionOptions{ 371 Mode: &sppb.TransactionOptions_ReadOnly_{ 372 ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true), 373 }, 374 }, 375 }) 376 if e != nil { 377 return e 378 } 379 tx = res.Id 380 if res.ReadTimestamp != nil { 381 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 382 } 383 return nil 384 }) 385 t.mu.Lock() 386 387 // defer function will be executed with t.mu being held. 388 locked = true 389 390 // During the execution of t.begin(), t.Close() was invoked. 391 if t.state == txClosed { 392 return errSessionClosed(sh) 393 } 394 395 // If begin() fails, this allows other queries to take over the 396 // initialization. 397 t.tx = nil 398 if err == nil { 399 t.tx = tx 400 t.rts = rts 401 t.sh = sh 402 // State transite to txActive. 403 t.state = txActive 404 } 405 return err 406} 407 408// acquire implements txReadEnv.acquire. 409func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 410 if err := checkNestedTxn(ctx); err != nil { 411 return nil, nil, err 412 } 413 if t.singleUse { 414 return t.acquireSingleUse(ctx) 415 } 416 return t.acquireMultiUse(ctx) 417} 418 419func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 420 t.mu.Lock() 421 defer t.mu.Unlock() 422 switch t.state { 423 case txClosed: 424 // A closed single-use transaction can never be reused. 425 return nil, nil, errTxClosed() 426 case txNew: 427 t.state = txClosed 428 ts := &sppb.TransactionSelector{ 429 Selector: &sppb.TransactionSelector_SingleUse{ 430 SingleUse: &sppb.TransactionOptions{ 431 Mode: &sppb.TransactionOptions_ReadOnly_{ 432 ReadOnly: buildTransactionOptionsReadOnly(t.tb, true), 433 }, 434 }, 435 }, 436 } 437 sh, err := t.sp.take(ctx) 438 if err != nil { 439 return nil, nil, err 440 } 441 442 // Install session handle into t, which can be used for readonly 443 // operations later. 444 t.sh = sh 445 return sh, ts, nil 446 } 447 us := t.state 448 449 // SingleUse transaction should only be in either txNew state or txClosed 450 // state. 451 return nil, nil, errUnexpectedTxState(us) 452} 453 454func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 455 for { 456 t.mu.Lock() 457 switch t.state { 458 case txClosed: 459 t.mu.Unlock() 460 return nil, nil, errTxClosed() 461 case txNew: 462 // State transit to txInit so that no further TimestampBound change 463 // is accepted. 464 t.state = txInit 465 t.mu.Unlock() 466 continue 467 case txInit: 468 if t.tx != nil { 469 // Wait for a transaction ID to become ready. 470 txReadyOrClosed := t.txReadyOrClosed 471 t.mu.Unlock() 472 select { 473 case <-txReadyOrClosed: 474 // Need to check transaction state again. 475 continue 476 case <-ctx.Done(): 477 // The waiting for initialization is timeout, return error 478 // directly. 479 return nil, nil, errTxInitTimeout() 480 } 481 } 482 // Take the ownership of initializing the transaction. 483 t.tx = transactionID{} 484 t.mu.Unlock() 485 // Begin a read-only transaction. 486 // 487 // TODO: consider adding a transaction option which allow queries to 488 // initiate transactions by themselves. Note that this option might 489 // not be always good because the ID of the new transaction won't 490 // be ready till the query returns some data or completes. 491 if err := t.begin(ctx); err != nil { 492 return nil, nil, err 493 } 494 495 // If t.begin() succeeded, t.state should have been changed to 496 // txActive, so we can just continue here. 497 continue 498 case txActive: 499 sh := t.sh 500 ts := &sppb.TransactionSelector{ 501 Selector: &sppb.TransactionSelector_Id{ 502 Id: t.tx, 503 }, 504 } 505 t.mu.Unlock() 506 return sh, ts, nil 507 } 508 state := t.state 509 t.mu.Unlock() 510 return nil, nil, errUnexpectedTxState(state) 511 } 512} 513 514func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) { 515 t.mu.Lock() 516 defer t.mu.Unlock() 517 if t.rts.IsZero() { 518 t.rts = ts 519 } 520} 521 522// release implements txReadEnv.release. 523func (t *ReadOnlyTransaction) release(err error) { 524 t.mu.Lock() 525 sh := t.sh 526 t.mu.Unlock() 527 if sh != nil { // sh could be nil if t.acquire() fails. 528 if shouldDropSession(err) { 529 sh.destroy() 530 } 531 if t.singleUse { 532 // If session handle is already destroyed, this becomes a noop. 533 sh.recycle() 534 } 535 } 536} 537 538// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads 539// after being closed. 540func (t *ReadOnlyTransaction) Close() { 541 if t.singleUse { 542 return 543 } 544 t.mu.Lock() 545 if t.state != txClosed { 546 t.state = txClosed 547 close(t.txReadyOrClosed) 548 } 549 sh := t.sh 550 t.mu.Unlock() 551 if sh == nil { 552 return 553 } 554 // If session handle is already destroyed, this becomes a noop. If there are 555 // still active queries and if the recycled session is reused before they 556 // complete, Cloud Spanner will cancel them on behalf of the new transaction 557 // on the session. 558 if sh != nil { 559 sh.recycle() 560 } 561} 562 563// Timestamp returns the timestamp chosen to perform reads and queries in this 564// transaction. The value can only be read after some read or query has either 565// returned some data or completed without returning any data. 566func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) { 567 t.mu.Lock() 568 defer t.mu.Unlock() 569 if t.rts.IsZero() { 570 return t.rts, errRtsUnavailable() 571 } 572 return t.rts, nil 573} 574 575// WithTimestampBound specifies the TimestampBound to use for read or query. 576// This can only be used before the first read or query is invoked. Note: 577// bounded staleness is not available with general ReadOnlyTransactions; use a 578// single-use ReadOnlyTransaction instead. 579// 580// The returned value is the ReadOnlyTransaction so calls can be chained. 581func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction { 582 t.mu.Lock() 583 defer t.mu.Unlock() 584 if t.state == txNew { 585 // Only allow to set TimestampBound before the first query. 586 t.tb = tb 587 } 588 return t 589} 590 591// ReadWriteTransaction provides a locking read-write transaction. 592// 593// This type of transaction is the only way to write data into Cloud Spanner; 594// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use 595// transactions internally. These transactions rely on pessimistic locking and, 596// if necessary, two-phase commit. Locking read-write transactions may abort, 597// requiring the application to retry. However, the interface exposed by 598// (*Client).ReadWriteTransaction eliminates the need for applications to write 599// retry loops explicitly. 600// 601// Locking transactions may be used to atomically read-modify-write data 602// anywhere in a database. This type of transaction is externally consistent. 603// 604// Clients should attempt to minimize the amount of time a transaction is 605// active. Faster transactions commit with higher probability and cause less 606// contention. Cloud Spanner attempts to keep read locks active as long as the 607// transaction continues to do reads. Long periods of inactivity at the client 608// may cause Cloud Spanner to release a transaction's locks and abort it. 609// 610// Reads performed within a transaction acquire locks on the data being 611// read. Writes can only be done at commit time, after all reads have been 612// completed. Conceptually, a read-write transaction consists of zero or more 613// reads or SQL queries followed by a commit. 614// 615// See (*Client).ReadWriteTransaction for an example. 616// 617// Semantics 618// 619// Cloud Spanner can commit the transaction if all read locks it acquired are 620// still valid at commit time, and it is able to acquire write locks for all 621// writes. Cloud Spanner can abort the transaction for any reason. If a commit 622// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has 623// not modified any user data in Cloud Spanner. 624// 625// Unless the transaction commits, Cloud Spanner makes no guarantees about how 626// long the transaction's locks were held for. It is an error to use Cloud 627// Spanner locks for any sort of mutual exclusion other than between Cloud 628// Spanner transactions themselves. 629// 630// Aborted transactions 631// 632// Application code does not need to retry explicitly; RunInTransaction will 633// automatically retry a transaction if an attempt results in an abort. The lock 634// priority of a transaction increases after each prior aborted transaction, 635// meaning that the next attempt has a slightly better chance of success than 636// before. 637// 638// Under some circumstances (e.g., many transactions attempting to modify the 639// same row(s)), a transaction can abort many times in a short period before 640// successfully committing. Thus, it is not a good idea to cap the number of 641// retries a transaction can attempt; instead, it is better to limit the total 642// amount of wall time spent retrying. 643// 644// Idle transactions 645// 646// A transaction is considered idle if it has no outstanding reads or SQL 647// queries and has not started a read or SQL query within the last 10 648// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't 649// hold on to locks indefinitely. In that case, the commit will fail with error 650// ABORTED. 651// 652// If this behavior is undesirable, periodically executing a simple SQL query 653// in the transaction (e.g., SELECT 1) prevents the transaction from becoming 654// idle. 655type ReadWriteTransaction struct { 656 // txReadOnly contains methods for performing transactional reads. 657 txReadOnly 658 // sh is the sessionHandle allocated from sp. It is set only once during the 659 // initialization of ReadWriteTransaction. 660 sh *sessionHandle 661 // tx is the transaction ID in Cloud Spanner that uniquely identifies the 662 // ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin() 663 // during the initialization of ReadWriteTransaction. 664 tx transactionID 665 // mu protects concurrent access to the internal states of 666 // ReadWriteTransaction. 667 mu sync.Mutex 668 // state is the current transaction status of the read-write transaction. 669 state txState 670 // wb is the set of buffered mutations waiting to be committed. 671 wb []*Mutation 672} 673 674// BufferWrite adds a list of mutations to the set of updates that will be 675// applied when the transaction is committed. It does not actually apply the 676// write until the transaction is committed, so the operation does not block. 677// The effects of the write won't be visible to any reads (including reads done 678// in the same transaction) until the transaction commits. 679// 680// See the example for Client.ReadWriteTransaction. 681func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error { 682 t.mu.Lock() 683 defer t.mu.Unlock() 684 if t.state == txClosed { 685 return errTxClosed() 686 } 687 if t.state != txActive { 688 return errUnexpectedTxState(t.state) 689 } 690 t.wb = append(t.wb, ms...) 691 return nil 692} 693 694// Update executes a DML statement against the database. It returns the number 695// of affected rows. Update returns an error if the statement is a query. 696// However, the query is executed, and any data read will be validated upon 697// commit. 698func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) { 699 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update") 700 defer func() { trace.EndSpan(ctx, err) }() 701 req, sh, err := t.prepareExecuteSQL(ctx, stmt, sppb.ExecuteSqlRequest_NORMAL) 702 if err != nil { 703 return 0, err 704 } 705 resultSet, err := sh.getClient().ExecuteSql(ctx, req) 706 if err != nil { 707 return 0, err 708 } 709 if resultSet.Stats == nil { 710 return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL) 711 } 712 return extractRowCount(resultSet.Stats) 713} 714 715// BatchUpdate groups one or more DML statements and sends them to Spanner in a 716// single RPC. This is an efficient way to execute multiple DML statements. 717// 718// A slice of counts is returned, where each count represents the number of 719// affected rows for the given query at the same index. If an error occurs, 720// counts will be returned up to the query that encountered the error. 721func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) { 722 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate") 723 defer func() { trace.EndSpan(ctx, err) }() 724 725 sh, ts, err := t.acquire(ctx) 726 if err != nil { 727 return nil, err 728 } 729 // Cloud Spanner will return "Session not found" on bad sessions. 730 sid := sh.getID() 731 if sid == "" { 732 // Might happen if transaction is closed in the middle of a API call. 733 return nil, errSessionClosed(sh) 734 } 735 736 var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement 737 for _, st := range stmts { 738 params, paramTypes, err := st.convertParams() 739 if err != nil { 740 return nil, err 741 } 742 sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{ 743 Sql: st.SQL, 744 Params: params, 745 ParamTypes: paramTypes, 746 }) 747 } 748 749 resp, err := sh.getClient().ExecuteBatchDml(ctx, &sppb.ExecuteBatchDmlRequest{ 750 Session: sh.getID(), 751 Transaction: ts, 752 Statements: sppbStmts, 753 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 754 }) 755 if err != nil { 756 return nil, err 757 } 758 759 var counts []int64 760 for _, rs := range resp.ResultSets { 761 count, err := extractRowCount(rs.Stats) 762 if err != nil { 763 return nil, err 764 } 765 counts = append(counts, count) 766 } 767 if resp.Status.Code != 0 { 768 return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message) 769 } 770 return counts, nil 771} 772 773// acquire implements txReadEnv.acquire. 774func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 775 ts := &sppb.TransactionSelector{ 776 Selector: &sppb.TransactionSelector_Id{ 777 Id: t.tx, 778 }, 779 } 780 t.mu.Lock() 781 defer t.mu.Unlock() 782 switch t.state { 783 case txClosed: 784 return nil, nil, errTxClosed() 785 case txActive: 786 return t.sh, ts, nil 787 } 788 return nil, nil, errUnexpectedTxState(t.state) 789} 790 791// release implements txReadEnv.release. 792func (t *ReadWriteTransaction) release(err error) { 793 t.mu.Lock() 794 sh := t.sh 795 t.mu.Unlock() 796 if sh != nil && shouldDropSession(err) { 797 sh.destroy() 798 } 799} 800 801func beginTransaction(ctx context.Context, sid string, client sppb.SpannerClient) (transactionID, error) { 802 var tx transactionID 803 err := runRetryable(ctx, func(ctx context.Context) error { 804 res, e := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{ 805 Session: sid, 806 Options: &sppb.TransactionOptions{ 807 Mode: &sppb.TransactionOptions_ReadWrite_{ 808 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 809 }, 810 }, 811 }) 812 if e != nil { 813 return e 814 } 815 tx = res.Id 816 return nil 817 }) 818 if err != nil { 819 return nil, err 820 } 821 return tx, nil 822} 823 824// begin starts a read-write transacton on Cloud Spanner, it is always called 825// before any of the public APIs. 826func (t *ReadWriteTransaction) begin(ctx context.Context) error { 827 if t.tx != nil { 828 t.state = txActive 829 return nil 830 } 831 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) 832 if err == nil { 833 t.tx = tx 834 t.state = txActive 835 return nil 836 } 837 if shouldDropSession(err) { 838 t.sh.destroy() 839 } 840 return err 841} 842 843// commit tries to commit a readwrite transaction to Cloud Spanner. It also 844// returns the commit timestamp for the transactions. 845func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) { 846 var ts time.Time 847 t.mu.Lock() 848 t.state = txClosed // No further operations after commit. 849 mPb, err := mutationsProto(t.wb) 850 t.mu.Unlock() 851 if err != nil { 852 return ts, err 853 } 854 // In case that sessionHandle was destroyed but transaction body fails to 855 // report it. 856 sid, client := t.sh.getID(), t.sh.getClient() 857 if sid == "" || client == nil { 858 return ts, errSessionClosed(t.sh) 859 } 860 err = runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error { 861 var trailer metadata.MD 862 res, e := client.Commit(ctx, &sppb.CommitRequest{ 863 Session: sid, 864 Transaction: &sppb.CommitRequest_TransactionId{ 865 TransactionId: t.tx, 866 }, 867 Mutations: mPb, 868 }, grpc.Trailer(&trailer)) 869 if e != nil { 870 return toSpannerErrorWithMetadata(e, trailer) 871 } 872 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 873 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 874 } 875 return nil 876 }) 877 if shouldDropSession(err) { 878 t.sh.destroy() 879 } 880 return ts, err 881} 882 883// rollback is called when a commit is aborted or the transaction body runs 884// into error. 885func (t *ReadWriteTransaction) rollback(ctx context.Context) { 886 t.mu.Lock() 887 // Forbid further operations on rollbacked transaction. 888 t.state = txClosed 889 t.mu.Unlock() 890 // In case that sessionHandle was destroyed but transaction body fails to 891 // report it. 892 sid, client := t.sh.getID(), t.sh.getClient() 893 if sid == "" || client == nil { 894 return 895 } 896 err := runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error { 897 _, e := client.Rollback(ctx, &sppb.RollbackRequest{ 898 Session: sid, 899 TransactionId: t.tx, 900 }) 901 return e 902 }) 903 if shouldDropSession(err) { 904 t.sh.destroy() 905 } 906} 907 908// runInTransaction executes f under a read-write transaction context. 909func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) { 910 var ( 911 ts time.Time 912 err error 913 ) 914 if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil { 915 // Try to commit if transaction body returns no error. 916 ts, err = t.commit(ctx) 917 } 918 if err != nil { 919 if isAbortErr(err) { 920 // Retry the transaction using the same session on ABORT error. 921 // Cloud Spanner will create the new transaction with the previous 922 // one's wound-wait priority. 923 err = errRetry(err) 924 return ts, err 925 } 926 // Not going to commit, according to API spec, should rollback the 927 // transaction. 928 t.rollback(ctx) 929 return ts, err 930 } 931 // err == nil, return commit timestamp. 932 return ts, nil 933} 934 935// writeOnlyTransaction provides the most efficient way of doing write-only 936// transactions. It essentially does blind writes to Cloud Spanner. 937type writeOnlyTransaction struct { 938 // sp is the session pool which writeOnlyTransaction uses to get Cloud 939 // Spanner sessions for blind writes. 940 sp *sessionPool 941} 942 943// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, 944// unless one of the following happens: 945// 946// 1) Context times out. 947// 2) An unretryable error (e.g. database not found) occurs. 948// 3) There is a malformed Mutation object. 949func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) { 950 var ( 951 ts time.Time 952 sh *sessionHandle 953 ) 954 mPb, err := mutationsProto(ms) 955 if err != nil { 956 // Malformed mutation found, just return the error. 957 return ts, err 958 } 959 err = runRetryable(ctx, func(ct context.Context) error { 960 var e error 961 var trailers metadata.MD 962 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 963 // No usable session for doing the commit, take one from pool. 964 sh, e = t.sp.take(ctx) 965 if e != nil { 966 // sessionPool.Take already retries for session 967 // creations/retrivals. 968 return e 969 } 970 } 971 res, e := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ 972 Session: sh.getID(), 973 Transaction: &sppb.CommitRequest_SingleUseTransaction{ 974 SingleUseTransaction: &sppb.TransactionOptions{ 975 Mode: &sppb.TransactionOptions_ReadWrite_{ 976 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 977 }, 978 }, 979 }, 980 Mutations: mPb, 981 }, grpc.Trailer(&trailers)) 982 if e != nil { 983 if isAbortErr(e) { 984 // Mask ABORT error as retryable, because aborted transactions 985 // are allowed to be retried. 986 return errRetry(toSpannerErrorWithMetadata(e, trailers)) 987 } 988 if shouldDropSession(e) { 989 // Discard the bad session. 990 sh.destroy() 991 } 992 return e 993 } 994 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 995 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 996 } 997 return nil 998 }) 999 if sh != nil { 1000 sh.recycle() 1001 } 1002 return ts, err 1003} 1004 1005// isAbortedErr returns true if the error indicates that an gRPC call is 1006// aborted on the server side. 1007func isAbortErr(err error) bool { 1008 if err == nil { 1009 return false 1010 } 1011 if ErrCode(err) == codes.Aborted { 1012 return true 1013 } 1014 return false 1015} 1016