1/* 2Copyright 2017 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "sync" 22 "sync/atomic" 23 "time" 24 25 "cloud.google.com/go/internal/trace" 26 "google.golang.org/api/iterator" 27 sppb "google.golang.org/genproto/googleapis/spanner/v1" 28 "google.golang.org/grpc" 29 "google.golang.org/grpc/codes" 30 "google.golang.org/grpc/metadata" 31) 32 33// transactionID stores a transaction ID which uniquely identifies a transaction in Cloud Spanner. 34type transactionID []byte 35 36// txReadEnv manages a read-transaction environment consisting of a session handle and a transaction selector. 37type txReadEnv interface { 38 // acquire returns a read-transaction environment that can be used to perform a transactional read. 39 acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) 40 // sets the transaction's read timestamp 41 setTimestamp(time.Time) 42 // release should be called at the end of every transactional read to deal with session recycling. 43 release(error) 44} 45 46// txReadOnly contains methods for doing transactional reads. 47type txReadOnly struct { 48 // read-transaction environment for performing transactional read operations. 49 txReadEnv 50 51 sequenceNumber int64 // Atomic. Only needed for DML statements, but used for all. 52} 53 54// errSessionClosed returns error for using a recycled/destroyed session 55func errSessionClosed(sh *sessionHandle) error { 56 return spannerErrorf(codes.FailedPrecondition, 57 "session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient()) 58} 59 60// Read returns a RowIterator for reading multiple rows from the database. 61func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator { 62 return t.ReadWithOptions(ctx, table, keys, columns, nil) 63} 64 65// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}. 66func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) { 67 return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index}) 68} 69 70// ReadOptions provides options for reading rows from a database. 71type ReadOptions struct { 72 // The index to use for reading. If non-empty, you can only read columns that are 73 // part of the index key, part of the primary key, or stored in the index due to 74 // a STORING clause in the index definition. 75 Index string 76 77 // The maximum number of rows to read. A limit value less than 1 means no limit. 78 Limit int 79} 80 81// ReadWithOptions returns a RowIterator for reading multiple rows from the database. 82// Pass a ReadOptions to modify the read operation. 83func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) { 84 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read") 85 defer func() { trace.EndSpan(ctx, ri.err) }() 86 var ( 87 sh *sessionHandle 88 ts *sppb.TransactionSelector 89 err error 90 ) 91 kset, err := keys.keySetProto() 92 if err != nil { 93 return &RowIterator{err: err} 94 } 95 if sh, ts, err = t.acquire(ctx); err != nil { 96 return &RowIterator{err: err} 97 } 98 // Cloud Spanner will return "Session not found" on bad sessions. 99 sid, client := sh.getID(), sh.getClient() 100 if sid == "" || client == nil { 101 // Might happen if transaction is closed in the middle of a API call. 102 return &RowIterator{err: errSessionClosed(sh)} 103 } 104 index := "" 105 limit := 0 106 if opts != nil { 107 index = opts.Index 108 if opts.Limit > 0 { 109 limit = opts.Limit 110 } 111 } 112 return stream( 113 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 114 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 115 return client.StreamingRead(ctx, 116 &sppb.ReadRequest{ 117 Session: sid, 118 Transaction: ts, 119 Table: table, 120 Index: index, 121 Columns: columns, 122 KeySet: kset, 123 ResumeToken: resumeToken, 124 Limit: int64(limit), 125 }) 126 }, 127 t.setTimestamp, 128 t.release, 129 ) 130} 131 132// errRowNotFound returns error for not being able to read the row identified by key. 133func errRowNotFound(table string, key Key) error { 134 return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key) 135} 136 137// ReadRow reads a single row from the database. 138// 139// If no row is present with the given key, then ReadRow returns an error where 140// spanner.ErrCode(err) is codes.NotFound. 141func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) { 142 iter := t.Read(ctx, table, key, columns) 143 defer iter.Stop() 144 row, err := iter.Next() 145 switch err { 146 case iterator.Done: 147 return nil, errRowNotFound(table, key) 148 case nil: 149 return row, nil 150 default: 151 return nil, err 152 } 153} 154 155// Query executes a query against the database. It returns a RowIterator 156// for retrieving the resulting rows. 157// 158// Query returns only row data, without a query plan or execution statistics. 159// Use QueryWithStats to get rows along with the plan and statistics. 160// Use AnalyzeQuery to get just the plan. 161func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator { 162 return t.query(ctx, statement, sppb.ExecuteSqlRequest_NORMAL) 163} 164 165// Query executes a SQL statement against the database. It returns a RowIterator 166// for retrieving the resulting rows. The RowIterator will also be populated 167// with a query plan and execution statistics. 168func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator { 169 return t.query(ctx, statement, sppb.ExecuteSqlRequest_PROFILE) 170} 171 172// AnalyzeQuery returns the query plan for statement. 173func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) { 174 iter := t.query(ctx, statement, sppb.ExecuteSqlRequest_PLAN) 175 defer iter.Stop() 176 for { 177 _, err := iter.Next() 178 if err == iterator.Done { 179 break 180 } 181 if err != nil { 182 return nil, err 183 } 184 } 185 if iter.QueryPlan == nil { 186 return nil, spannerErrorf(codes.Internal, "query plan unavailable") 187 } 188 return iter.QueryPlan, nil 189} 190 191func (t *txReadOnly) query(ctx context.Context, statement Statement, mode sppb.ExecuteSqlRequest_QueryMode) (ri *RowIterator) { 192 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query") 193 defer func() { trace.EndSpan(ctx, ri.err) }() 194 req, sh, err := t.prepareExecuteSQL(ctx, statement, mode) 195 if err != nil { 196 return &RowIterator{err: err} 197 } 198 client := sh.getClient() 199 return stream( 200 contextWithOutgoingMetadata(ctx, sh.getMetadata()), 201 func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { 202 req.ResumeToken = resumeToken 203 return client.ExecuteStreamingSql(ctx, req) 204 }, 205 t.setTimestamp, 206 t.release) 207} 208 209func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, mode sppb.ExecuteSqlRequest_QueryMode) (*sppb.ExecuteSqlRequest, *sessionHandle, error) { 210 sh, ts, err := t.acquire(ctx) 211 if err != nil { 212 return nil, nil, err 213 } 214 // Cloud Spanner will return "Session not found" on bad sessions. 215 sid := sh.getID() 216 if sid == "" { 217 // Might happen if transaction is closed in the middle of a API call. 218 return nil, nil, errSessionClosed(sh) 219 } 220 params, paramTypes, err := stmt.convertParams() 221 if err != nil { 222 return nil, nil, err 223 } 224 req := &sppb.ExecuteSqlRequest{ 225 Session: sid, 226 Transaction: ts, 227 Sql: stmt.SQL, 228 QueryMode: mode, 229 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 230 Params: params, 231 ParamTypes: paramTypes, 232 } 233 return req, sh, nil 234} 235 236// txState is the status of a transaction. 237type txState int 238 239const ( 240 // transaction is new, waiting to be initialized. 241 txNew txState = iota 242 // transaction is being initialized. 243 txInit 244 // transaction is active and can perform read/write. 245 txActive 246 // transaction is closed, cannot be used anymore. 247 txClosed 248) 249 250// errRtsUnavailable returns error for read transaction's read timestamp being unavailable. 251func errRtsUnavailable() error { 252 return spannerErrorf(codes.Internal, "read timestamp is unavailable") 253} 254 255// errTxClosed returns error for using a closed transaction. 256func errTxClosed() error { 257 return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction") 258} 259 260// errUnexpectedTxState returns error for transaction enters an unexpected state. 261func errUnexpectedTxState(ts txState) error { 262 return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts) 263} 264 265// ReadOnlyTransaction provides a snapshot transaction with guaranteed 266// consistency across reads, but does not allow writes. Read-only 267// transactions can be configured to read at timestamps in the past. 268// 269// Read-only transactions do not take locks. Instead, they work by choosing a 270// Cloud Spanner timestamp, then executing all reads at that timestamp. Since they do 271// not acquire locks, they do not block concurrent read-write transactions. 272// 273// Unlike locking read-write transactions, read-only transactions never 274// abort. They can fail if the chosen read timestamp is garbage collected; 275// however, the default garbage collection policy is generous enough that most 276// applications do not need to worry about this in practice. See the 277// documentation of TimestampBound for more details. 278// 279// A ReadOnlyTransaction consumes resources on the server until Close is 280// called. 281type ReadOnlyTransaction struct { 282 // txReadOnly contains methods for performing transactional reads. 283 txReadOnly 284 285 // singleUse indicates that the transaction can be used for only one read. 286 singleUse bool 287 288 // sp is the session pool for allocating a session to execute the read-only transaction. It is set only once during initialization of the ReadOnlyTransaction. 289 sp *sessionPool 290 // mu protects concurrent access to the internal states of ReadOnlyTransaction. 291 mu sync.Mutex 292 // tx is the transaction ID in Cloud Spanner that uniquely identifies the ReadOnlyTransaction. 293 tx transactionID 294 // txReadyOrClosed is for broadcasting that transaction ID has been returned by Cloud Spanner or that transaction is closed. 295 txReadyOrClosed chan struct{} 296 // state is the current transaction status of the ReadOnly transaction. 297 state txState 298 // sh is the sessionHandle allocated from sp. 299 sh *sessionHandle 300 // rts is the read timestamp returned by transactional reads. 301 rts time.Time 302 // tb is the read staleness bound specification for transactional reads. 303 tb TimestampBound 304} 305 306// errTxInitTimeout returns error for timeout in waiting for initialization of the transaction. 307func errTxInitTimeout() error { 308 return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization") 309} 310 311// getTimestampBound returns the read staleness bound specified for the ReadOnlyTransaction. 312func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound { 313 t.mu.Lock() 314 defer t.mu.Unlock() 315 return t.tb 316} 317 318// begin starts a snapshot read-only Transaction on Cloud Spanner. 319func (t *ReadOnlyTransaction) begin(ctx context.Context) error { 320 var ( 321 locked bool 322 tx transactionID 323 rts time.Time 324 sh *sessionHandle 325 err error 326 ) 327 defer func() { 328 if !locked { 329 t.mu.Lock() 330 // Not necessary, just to make it clear that t.mu is being held when locked == true. 331 locked = true 332 } 333 if t.state != txClosed { 334 // Signal other initialization routines. 335 close(t.txReadyOrClosed) 336 t.txReadyOrClosed = make(chan struct{}) 337 } 338 t.mu.Unlock() 339 if err != nil && sh != nil { 340 // Got a valid session handle, but failed to initialize transaction on Cloud Spanner. 341 if shouldDropSession(err) { 342 sh.destroy() 343 } 344 // If sh.destroy was already executed, this becomes a noop. 345 sh.recycle() 346 } 347 }() 348 sh, err = t.sp.take(ctx) 349 if err != nil { 350 return err 351 } 352 err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error { 353 res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{ 354 Session: sh.getID(), 355 Options: &sppb.TransactionOptions{ 356 Mode: &sppb.TransactionOptions_ReadOnly_{ 357 ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true), 358 }, 359 }, 360 }) 361 if e != nil { 362 return e 363 } 364 tx = res.Id 365 if res.ReadTimestamp != nil { 366 rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos)) 367 } 368 return nil 369 }) 370 t.mu.Lock() 371 locked = true // defer function will be executed with t.mu being held. 372 if t.state == txClosed { // During the execution of t.begin(), t.Close() was invoked. 373 return errSessionClosed(sh) 374 } 375 // If begin() fails, this allows other queries to take over the initialization. 376 t.tx = nil 377 if err == nil { 378 t.tx = tx 379 t.rts = rts 380 t.sh = sh 381 // State transite to txActive. 382 t.state = txActive 383 } 384 return err 385} 386 387// acquire implements txReadEnv.acquire. 388func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 389 if err := checkNestedTxn(ctx); err != nil { 390 return nil, nil, err 391 } 392 if t.singleUse { 393 return t.acquireSingleUse(ctx) 394 } 395 return t.acquireMultiUse(ctx) 396} 397 398func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 399 t.mu.Lock() 400 defer t.mu.Unlock() 401 switch t.state { 402 case txClosed: 403 // A closed single-use transaction can never be reused. 404 return nil, nil, errTxClosed() 405 case txNew: 406 t.state = txClosed 407 ts := &sppb.TransactionSelector{ 408 Selector: &sppb.TransactionSelector_SingleUse{ 409 SingleUse: &sppb.TransactionOptions{ 410 Mode: &sppb.TransactionOptions_ReadOnly_{ 411 ReadOnly: buildTransactionOptionsReadOnly(t.tb, true), 412 }, 413 }, 414 }, 415 } 416 sh, err := t.sp.take(ctx) 417 if err != nil { 418 return nil, nil, err 419 } 420 // Install session handle into t, which can be used for readonly operations later. 421 t.sh = sh 422 return sh, ts, nil 423 } 424 us := t.state 425 // SingleUse transaction should only be in either txNew state or txClosed state. 426 return nil, nil, errUnexpectedTxState(us) 427} 428 429func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 430 for { 431 t.mu.Lock() 432 switch t.state { 433 case txClosed: 434 t.mu.Unlock() 435 return nil, nil, errTxClosed() 436 case txNew: 437 // State transit to txInit so that no further TimestampBound change is accepted. 438 t.state = txInit 439 t.mu.Unlock() 440 continue 441 case txInit: 442 if t.tx != nil { 443 // Wait for a transaction ID to become ready. 444 txReadyOrClosed := t.txReadyOrClosed 445 t.mu.Unlock() 446 select { 447 case <-txReadyOrClosed: 448 // Need to check transaction state again. 449 continue 450 case <-ctx.Done(): 451 // The waiting for initialization is timeout, return error directly. 452 return nil, nil, errTxInitTimeout() 453 } 454 } 455 // Take the ownership of initializing the transaction. 456 t.tx = transactionID{} 457 t.mu.Unlock() 458 // Begin a read-only transaction. 459 // TODO: consider adding a transaction option which allow queries to initiate transactions by themselves. Note that this option might not be 460 // always good because the ID of the new transaction won't be ready till the query returns some data or completes. 461 if err := t.begin(ctx); err != nil { 462 return nil, nil, err 463 } 464 // If t.begin() succeeded, t.state should have been changed to txActive, so we can just continue here. 465 continue 466 case txActive: 467 sh := t.sh 468 ts := &sppb.TransactionSelector{ 469 Selector: &sppb.TransactionSelector_Id{ 470 Id: t.tx, 471 }, 472 } 473 t.mu.Unlock() 474 return sh, ts, nil 475 } 476 state := t.state 477 t.mu.Unlock() 478 return nil, nil, errUnexpectedTxState(state) 479 } 480} 481 482func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) { 483 t.mu.Lock() 484 defer t.mu.Unlock() 485 if t.rts.IsZero() { 486 t.rts = ts 487 } 488} 489 490// release implements txReadEnv.release. 491func (t *ReadOnlyTransaction) release(err error) { 492 t.mu.Lock() 493 sh := t.sh 494 t.mu.Unlock() 495 if sh != nil { // sh could be nil if t.acquire() fails. 496 if shouldDropSession(err) { 497 sh.destroy() 498 } 499 if t.singleUse { 500 // If session handle is already destroyed, this becomes a noop. 501 sh.recycle() 502 } 503 } 504} 505 506// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads after being closed. 507func (t *ReadOnlyTransaction) Close() { 508 if t.singleUse { 509 return 510 } 511 t.mu.Lock() 512 if t.state != txClosed { 513 t.state = txClosed 514 close(t.txReadyOrClosed) 515 } 516 sh := t.sh 517 t.mu.Unlock() 518 if sh == nil { 519 return 520 } 521 // If session handle is already destroyed, this becomes a noop. 522 // If there are still active queries and if the recycled session is reused before they complete, Cloud Spanner will cancel them 523 // on behalf of the new transaction on the session. 524 if sh != nil { 525 sh.recycle() 526 } 527} 528 529// Timestamp returns the timestamp chosen to perform reads and 530// queries in this transaction. The value can only be read after some 531// read or query has either returned some data or completed without 532// returning any data. 533func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) { 534 t.mu.Lock() 535 defer t.mu.Unlock() 536 if t.rts.IsZero() { 537 return t.rts, errRtsUnavailable() 538 } 539 return t.rts, nil 540} 541 542// WithTimestampBound specifies the TimestampBound to use for read or query. 543// This can only be used before the first read or query is invoked. Note: 544// bounded staleness is not available with general ReadOnlyTransactions; use a 545// single-use ReadOnlyTransaction instead. 546// 547// The returned value is the ReadOnlyTransaction so calls can be chained. 548func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction { 549 t.mu.Lock() 550 defer t.mu.Unlock() 551 if t.state == txNew { 552 // Only allow to set TimestampBound before the first query. 553 t.tb = tb 554 } 555 return t 556} 557 558// ReadWriteTransaction provides a locking read-write transaction. 559// 560// This type of transaction is the only way to write data into Cloud Spanner; 561// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use 562// transactions internally. These transactions rely on pessimistic locking and, 563// if necessary, two-phase commit. Locking read-write transactions may abort, 564// requiring the application to retry. However, the interface exposed by 565// (*Client).ReadWriteTransaction eliminates the need for applications to write 566// retry loops explicitly. 567// 568// Locking transactions may be used to atomically read-modify-write data 569// anywhere in a database. This type of transaction is externally consistent. 570// 571// Clients should attempt to minimize the amount of time a transaction is 572// active. Faster transactions commit with higher probability and cause less 573// contention. Cloud Spanner attempts to keep read locks active as long as the 574// transaction continues to do reads. Long periods of inactivity at the client 575// may cause Cloud Spanner to release a transaction's locks and abort it. 576// 577// Reads performed within a transaction acquire locks on the data being 578// read. Writes can only be done at commit time, after all reads have been 579// completed. Conceptually, a read-write transaction consists of zero or more 580// reads or SQL queries followed by a commit. 581// 582// See (*Client).ReadWriteTransaction for an example. 583// 584// Semantics 585// 586// Cloud Spanner can commit the transaction if all read locks it acquired are still 587// valid at commit time, and it is able to acquire write locks for all 588// writes. Cloud Spanner can abort the transaction for any reason. If a commit 589// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has not 590// modified any user data in Cloud Spanner. 591// 592// Unless the transaction commits, Cloud Spanner makes no guarantees about how long 593// the transaction's locks were held for. It is an error to use Cloud Spanner locks 594// for any sort of mutual exclusion other than between Cloud Spanner transactions 595// themselves. 596// 597// Aborted transactions 598// 599// Application code does not need to retry explicitly; RunInTransaction will 600// automatically retry a transaction if an attempt results in an abort. The 601// lock priority of a transaction increases after each prior aborted 602// transaction, meaning that the next attempt has a slightly better chance of 603// success than before. 604// 605// Under some circumstances (e.g., many transactions attempting to modify the 606// same row(s)), a transaction can abort many times in a short period before 607// successfully committing. Thus, it is not a good idea to cap the number of 608// retries a transaction can attempt; instead, it is better to limit the total 609// amount of wall time spent retrying. 610// 611// Idle transactions 612// 613// A transaction is considered idle if it has no outstanding reads or SQL 614// queries and has not started a read or SQL query within the last 10 615// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't hold 616// on to locks indefinitely. In that case, the commit will fail with error 617// ABORTED. 618// 619// If this behavior is undesirable, periodically executing a simple SQL query 620// in the transaction (e.g., SELECT 1) prevents the transaction from becoming 621// idle. 622type ReadWriteTransaction struct { 623 // txReadOnly contains methods for performing transactional reads. 624 txReadOnly 625 // sh is the sessionHandle allocated from sp. It is set only once during the initialization of ReadWriteTransaction. 626 sh *sessionHandle 627 // tx is the transaction ID in Cloud Spanner that uniquely identifies the ReadWriteTransaction. 628 // It is set only once in ReadWriteTransaction.begin() during the initialization of ReadWriteTransaction. 629 tx transactionID 630 // mu protects concurrent access to the internal states of ReadWriteTransaction. 631 mu sync.Mutex 632 // state is the current transaction status of the read-write transaction. 633 state txState 634 // wb is the set of buffered mutations waiting to be committed. 635 wb []*Mutation 636} 637 638// BufferWrite adds a list of mutations to the set of updates that will be 639// applied when the transaction is committed. It does not actually apply the 640// write until the transaction is committed, so the operation does not 641// block. The effects of the write won't be visible to any reads (including 642// reads done in the same transaction) until the transaction commits. 643// 644// See the example for Client.ReadWriteTransaction. 645func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error { 646 t.mu.Lock() 647 defer t.mu.Unlock() 648 if t.state == txClosed { 649 return errTxClosed() 650 } 651 if t.state != txActive { 652 return errUnexpectedTxState(t.state) 653 } 654 t.wb = append(t.wb, ms...) 655 return nil 656} 657 658// Update executes a DML statement against the database. It returns the number of 659// affected rows. 660// Update returns an error if the statement is a query. However, the 661// query is executed, and any data read will be validated upon commit. 662func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) { 663 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update") 664 defer func() { trace.EndSpan(ctx, err) }() 665 req, sh, err := t.prepareExecuteSQL(ctx, stmt, sppb.ExecuteSqlRequest_NORMAL) 666 if err != nil { 667 return 0, err 668 } 669 resultSet, err := sh.getClient().ExecuteSql(ctx, req) 670 if err != nil { 671 return 0, err 672 } 673 if resultSet.Stats == nil { 674 return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL) 675 } 676 return extractRowCount(resultSet.Stats) 677} 678 679// BatchUpdate groups one or more DML statements and sends them to Spanner in a 680// single RPC. This is an efficient way to execute multiple DML statements. 681// 682// A slice of counts is returned, where each count represents the number of 683// affected rows for the given query at the same index. If an error occurs, 684// counts will be returned up to the query that encountered the error. 685func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) { 686 ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate") 687 defer func() { trace.EndSpan(ctx, err) }() 688 689 sh, ts, err := t.acquire(ctx) 690 if err != nil { 691 return nil, err 692 } 693 // Cloud Spanner will return "Session not found" on bad sessions. 694 sid := sh.getID() 695 if sid == "" { 696 // Might happen if transaction is closed in the middle of a API call. 697 return nil, errSessionClosed(sh) 698 } 699 700 var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement 701 for _, st := range stmts { 702 params, paramTypes, err := st.convertParams() 703 if err != nil { 704 return nil, err 705 } 706 sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{ 707 Sql: st.SQL, 708 Params: params, 709 ParamTypes: paramTypes, 710 }) 711 } 712 713 resp, err := sh.getClient().ExecuteBatchDml(ctx, &sppb.ExecuteBatchDmlRequest{ 714 Session: sh.getID(), 715 Transaction: ts, 716 Statements: sppbStmts, 717 Seqno: atomic.AddInt64(&t.sequenceNumber, 1), 718 }) 719 if err != nil { 720 return nil, err 721 } 722 723 var counts []int64 724 for _, rs := range resp.ResultSets { 725 count, err := extractRowCount(rs.Stats) 726 if err != nil { 727 return nil, err 728 } 729 counts = append(counts, count) 730 } 731 if resp.Status.Code != 0 { 732 return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message) 733 } 734 return counts, nil 735} 736 737// acquire implements txReadEnv.acquire. 738func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) { 739 ts := &sppb.TransactionSelector{ 740 Selector: &sppb.TransactionSelector_Id{ 741 Id: t.tx, 742 }, 743 } 744 t.mu.Lock() 745 defer t.mu.Unlock() 746 switch t.state { 747 case txClosed: 748 return nil, nil, errTxClosed() 749 case txActive: 750 return t.sh, ts, nil 751 } 752 return nil, nil, errUnexpectedTxState(t.state) 753} 754 755// release implements txReadEnv.release. 756func (t *ReadWriteTransaction) release(err error) { 757 t.mu.Lock() 758 sh := t.sh 759 t.mu.Unlock() 760 if sh != nil && shouldDropSession(err) { 761 sh.destroy() 762 } 763} 764 765func beginTransaction(ctx context.Context, sid string, client sppb.SpannerClient) (transactionID, error) { 766 var tx transactionID 767 err := runRetryable(ctx, func(ctx context.Context) error { 768 res, e := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{ 769 Session: sid, 770 Options: &sppb.TransactionOptions{ 771 Mode: &sppb.TransactionOptions_ReadWrite_{ 772 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 773 }, 774 }, 775 }) 776 if e != nil { 777 return e 778 } 779 tx = res.Id 780 return nil 781 }) 782 if err != nil { 783 return nil, err 784 } 785 return tx, nil 786} 787 788// begin starts a read-write transacton on Cloud Spanner, it is always called before any of the public APIs. 789func (t *ReadWriteTransaction) begin(ctx context.Context) error { 790 if t.tx != nil { 791 t.state = txActive 792 return nil 793 } 794 tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient()) 795 if err == nil { 796 t.tx = tx 797 t.state = txActive 798 return nil 799 } 800 if shouldDropSession(err) { 801 t.sh.destroy() 802 } 803 return err 804} 805 806// commit tries to commit a readwrite transaction to Cloud Spanner. It also returns the commit timestamp for the transactions. 807func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) { 808 var ts time.Time 809 t.mu.Lock() 810 t.state = txClosed // No further operations after commit. 811 mPb, err := mutationsProto(t.wb) 812 t.mu.Unlock() 813 if err != nil { 814 return ts, err 815 } 816 // In case that sessionHandle was destroyed but transaction body fails to report it. 817 sid, client := t.sh.getID(), t.sh.getClient() 818 if sid == "" || client == nil { 819 return ts, errSessionClosed(t.sh) 820 } 821 err = runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error { 822 var trailer metadata.MD 823 res, e := client.Commit(ctx, &sppb.CommitRequest{ 824 Session: sid, 825 Transaction: &sppb.CommitRequest_TransactionId{ 826 TransactionId: t.tx, 827 }, 828 Mutations: mPb, 829 }, grpc.Trailer(&trailer)) 830 if e != nil { 831 return toSpannerErrorWithMetadata(e, trailer) 832 } 833 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 834 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 835 } 836 return nil 837 }) 838 if shouldDropSession(err) { 839 t.sh.destroy() 840 } 841 return ts, err 842} 843 844// rollback is called when a commit is aborted or the transaction body runs into error. 845func (t *ReadWriteTransaction) rollback(ctx context.Context) { 846 t.mu.Lock() 847 // Forbid further operations on rollbacked transaction. 848 t.state = txClosed 849 t.mu.Unlock() 850 // In case that sessionHandle was destroyed but transaction body fails to report it. 851 sid, client := t.sh.getID(), t.sh.getClient() 852 if sid == "" || client == nil { 853 return 854 } 855 err := runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error { 856 _, e := client.Rollback(ctx, &sppb.RollbackRequest{ 857 Session: sid, 858 TransactionId: t.tx, 859 }) 860 return e 861 }) 862 if shouldDropSession(err) { 863 t.sh.destroy() 864 } 865} 866 867// runInTransaction executes f under a read-write transaction context. 868func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) { 869 var ( 870 ts time.Time 871 err error 872 ) 873 if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil { 874 // Try to commit if transaction body returns no error. 875 ts, err = t.commit(ctx) 876 } 877 if err != nil { 878 if isAbortErr(err) { 879 // Retry the transaction using the same session on ABORT error. 880 // Cloud Spanner will create the new transaction with the previous one's wound-wait priority. 881 err = errRetry(err) 882 return ts, err 883 } 884 // Not going to commit, according to API spec, should rollback the transaction. 885 t.rollback(ctx) 886 return ts, err 887 } 888 // err == nil, return commit timestamp. 889 return ts, nil 890} 891 892// writeOnlyTransaction provides the most efficient way of doing write-only transactions. It essentially does blind writes to Cloud Spanner. 893type writeOnlyTransaction struct { 894 // sp is the session pool which writeOnlyTransaction uses to get Cloud Spanner sessions for blind writes. 895 sp *sessionPool 896} 897 898// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, unless one of the following happens: 899// 1) Context times out. 900// 2) An unretryable error (e.g. database not found) occurs. 901// 3) There is a malformed Mutation object. 902func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) { 903 var ( 904 ts time.Time 905 sh *sessionHandle 906 ) 907 mPb, err := mutationsProto(ms) 908 if err != nil { 909 // Malformed mutation found, just return the error. 910 return ts, err 911 } 912 err = runRetryable(ctx, func(ct context.Context) error { 913 var e error 914 var trailers metadata.MD 915 if sh == nil || sh.getID() == "" || sh.getClient() == nil { 916 // No usable session for doing the commit, take one from pool. 917 sh, e = t.sp.take(ctx) 918 if e != nil { 919 // sessionPool.Take already retries for session creations/retrivals. 920 return e 921 } 922 } 923 res, e := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{ 924 Session: sh.getID(), 925 Transaction: &sppb.CommitRequest_SingleUseTransaction{ 926 SingleUseTransaction: &sppb.TransactionOptions{ 927 Mode: &sppb.TransactionOptions_ReadWrite_{ 928 ReadWrite: &sppb.TransactionOptions_ReadWrite{}, 929 }, 930 }, 931 }, 932 Mutations: mPb, 933 }, grpc.Trailer(&trailers)) 934 if e != nil { 935 if isAbortErr(e) { 936 // Mask ABORT error as retryable, because aborted transactions are allowed to be retried. 937 return errRetry(toSpannerErrorWithMetadata(e, trailers)) 938 } 939 if shouldDropSession(e) { 940 // Discard the bad session. 941 sh.destroy() 942 } 943 return e 944 } 945 if tstamp := res.GetCommitTimestamp(); tstamp != nil { 946 ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos)) 947 } 948 return nil 949 }) 950 if sh != nil { 951 sh.recycle() 952 } 953 return ts, err 954} 955 956// isAbortedErr returns true if the error indicates that an gRPC call is aborted on the server side. 957func isAbortErr(err error) bool { 958 if err == nil { 959 return false 960 } 961 if ErrCode(err) == codes.Aborted { 962 return true 963 } 964 return false 965} 966