1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	"cloud.google.com/go/internal/trace"
26	"google.golang.org/api/iterator"
27	sppb "google.golang.org/genproto/googleapis/spanner/v1"
28	"google.golang.org/grpc"
29	"google.golang.org/grpc/codes"
30	"google.golang.org/grpc/metadata"
31)
32
33// transactionID stores a transaction ID which uniquely identifies a transaction in Cloud Spanner.
34type transactionID []byte
35
36// txReadEnv manages a read-transaction environment consisting of a session handle and a transaction selector.
37type txReadEnv interface {
38	// acquire returns a read-transaction environment that can be used to perform a transactional read.
39	acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error)
40	// sets the transaction's read timestamp
41	setTimestamp(time.Time)
42	// release should be called at the end of every transactional read to deal with session recycling.
43	release(error)
44}
45
46// txReadOnly contains methods for doing transactional reads.
47type txReadOnly struct {
48	// read-transaction environment for performing transactional read operations.
49	txReadEnv
50
51	sequenceNumber int64 // Atomic. Only needed for DML statements, but used for all.
52}
53
54// errSessionClosed returns error for using a recycled/destroyed session
55func errSessionClosed(sh *sessionHandle) error {
56	return spannerErrorf(codes.FailedPrecondition,
57		"session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient())
58}
59
60// Read returns a RowIterator for reading multiple rows from the database.
61func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator {
62	return t.ReadWithOptions(ctx, table, keys, columns, nil)
63}
64
65// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}.
66func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) {
67	return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index})
68}
69
70// ReadOptions provides options for reading rows from a database.
71type ReadOptions struct {
72	// The index to use for reading. If non-empty, you can only read columns that are
73	// part of the index key, part of the primary key, or stored in the index due to
74	// a STORING clause in the index definition.
75	Index string
76
77	// The maximum number of rows to read. A limit value less than 1 means no limit.
78	Limit int
79}
80
81// ReadWithOptions returns a RowIterator for reading multiple rows from the database.
82// Pass a ReadOptions to modify the read operation.
83func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) {
84	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read")
85	defer func() { trace.EndSpan(ctx, ri.err) }()
86	var (
87		sh  *sessionHandle
88		ts  *sppb.TransactionSelector
89		err error
90	)
91	kset, err := keys.keySetProto()
92	if err != nil {
93		return &RowIterator{err: err}
94	}
95	if sh, ts, err = t.acquire(ctx); err != nil {
96		return &RowIterator{err: err}
97	}
98	// Cloud Spanner will return "Session not found" on bad sessions.
99	sid, client := sh.getID(), sh.getClient()
100	if sid == "" || client == nil {
101		// Might happen if transaction is closed in the middle of a API call.
102		return &RowIterator{err: errSessionClosed(sh)}
103	}
104	index := ""
105	limit := 0
106	if opts != nil {
107		index = opts.Index
108		if opts.Limit > 0 {
109			limit = opts.Limit
110		}
111	}
112	return stream(
113		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
114		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
115			return client.StreamingRead(ctx,
116				&sppb.ReadRequest{
117					Session:     sid,
118					Transaction: ts,
119					Table:       table,
120					Index:       index,
121					Columns:     columns,
122					KeySet:      kset,
123					ResumeToken: resumeToken,
124					Limit:       int64(limit),
125				})
126		},
127		t.setTimestamp,
128		t.release,
129	)
130}
131
132// errRowNotFound returns error for not being able to read the row identified by key.
133func errRowNotFound(table string, key Key) error {
134	return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key)
135}
136
137// ReadRow reads a single row from the database.
138//
139// If no row is present with the given key, then ReadRow returns an error where
140// spanner.ErrCode(err) is codes.NotFound.
141func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) {
142	iter := t.Read(ctx, table, key, columns)
143	defer iter.Stop()
144	row, err := iter.Next()
145	switch err {
146	case iterator.Done:
147		return nil, errRowNotFound(table, key)
148	case nil:
149		return row, nil
150	default:
151		return nil, err
152	}
153}
154
155// Query executes a query against the database. It returns a RowIterator
156// for retrieving the resulting rows.
157//
158// Query returns only row data, without a query plan or execution statistics.
159// Use QueryWithStats to get rows along with the plan and statistics.
160// Use AnalyzeQuery to get just the plan.
161func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator {
162	return t.query(ctx, statement, sppb.ExecuteSqlRequest_NORMAL)
163}
164
165// Query executes a SQL statement against the database. It returns a RowIterator
166// for retrieving the resulting rows. The RowIterator will also be populated
167// with a query plan and execution statistics.
168func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator {
169	return t.query(ctx, statement, sppb.ExecuteSqlRequest_PROFILE)
170}
171
172// AnalyzeQuery returns the query plan for statement.
173func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) {
174	iter := t.query(ctx, statement, sppb.ExecuteSqlRequest_PLAN)
175	defer iter.Stop()
176	for {
177		_, err := iter.Next()
178		if err == iterator.Done {
179			break
180		}
181		if err != nil {
182			return nil, err
183		}
184	}
185	if iter.QueryPlan == nil {
186		return nil, spannerErrorf(codes.Internal, "query plan unavailable")
187	}
188	return iter.QueryPlan, nil
189}
190
191func (t *txReadOnly) query(ctx context.Context, statement Statement, mode sppb.ExecuteSqlRequest_QueryMode) (ri *RowIterator) {
192	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query")
193	defer func() { trace.EndSpan(ctx, ri.err) }()
194	req, sh, err := t.prepareExecuteSQL(ctx, statement, mode)
195	if err != nil {
196		return &RowIterator{err: err}
197	}
198	client := sh.getClient()
199	return stream(
200		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
201		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
202			req.ResumeToken = resumeToken
203			return client.ExecuteStreamingSql(ctx, req)
204		},
205		t.setTimestamp,
206		t.release)
207}
208
209func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, mode sppb.ExecuteSqlRequest_QueryMode) (*sppb.ExecuteSqlRequest, *sessionHandle, error) {
210	sh, ts, err := t.acquire(ctx)
211	if err != nil {
212		return nil, nil, err
213	}
214	// Cloud Spanner will return "Session not found" on bad sessions.
215	sid := sh.getID()
216	if sid == "" {
217		// Might happen if transaction is closed in the middle of a API call.
218		return nil, nil, errSessionClosed(sh)
219	}
220	params, paramTypes, err := stmt.convertParams()
221	if err != nil {
222		return nil, nil, err
223	}
224	req := &sppb.ExecuteSqlRequest{
225		Session:     sid,
226		Transaction: ts,
227		Sql:         stmt.SQL,
228		QueryMode:   mode,
229		Seqno:       atomic.AddInt64(&t.sequenceNumber, 1),
230		Params:      params,
231		ParamTypes:  paramTypes,
232	}
233	return req, sh, nil
234}
235
236// txState is the status of a transaction.
237type txState int
238
239const (
240	// transaction is new, waiting to be initialized.
241	txNew txState = iota
242	// transaction is being initialized.
243	txInit
244	// transaction is active and can perform read/write.
245	txActive
246	// transaction is closed, cannot be used anymore.
247	txClosed
248)
249
250// errRtsUnavailable returns error for read transaction's read timestamp being unavailable.
251func errRtsUnavailable() error {
252	return spannerErrorf(codes.Internal, "read timestamp is unavailable")
253}
254
255// errTxClosed returns error for using a closed transaction.
256func errTxClosed() error {
257	return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction")
258}
259
260// errUnexpectedTxState returns error for transaction enters an unexpected state.
261func errUnexpectedTxState(ts txState) error {
262	return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts)
263}
264
265// ReadOnlyTransaction provides a snapshot transaction with guaranteed
266// consistency across reads, but does not allow writes.  Read-only
267// transactions can be configured to read at timestamps in the past.
268//
269// Read-only transactions do not take locks. Instead, they work by choosing a
270// Cloud Spanner timestamp, then executing all reads at that timestamp. Since they do
271// not acquire locks, they do not block concurrent read-write transactions.
272//
273// Unlike locking read-write transactions, read-only transactions never
274// abort. They can fail if the chosen read timestamp is garbage collected;
275// however, the default garbage collection policy is generous enough that most
276// applications do not need to worry about this in practice. See the
277// documentation of TimestampBound for more details.
278//
279// A ReadOnlyTransaction consumes resources on the server until Close is
280// called.
281type ReadOnlyTransaction struct {
282	// txReadOnly contains methods for performing transactional reads.
283	txReadOnly
284
285	// singleUse indicates that the transaction can be used for only one read.
286	singleUse bool
287
288	// sp is the session pool for allocating a session to execute the read-only transaction. It is set only once during initialization of the ReadOnlyTransaction.
289	sp *sessionPool
290	// mu protects concurrent access to the internal states of ReadOnlyTransaction.
291	mu sync.Mutex
292	// tx is the transaction ID in Cloud Spanner that uniquely identifies the ReadOnlyTransaction.
293	tx transactionID
294	// txReadyOrClosed is for broadcasting that transaction ID has been returned by Cloud Spanner or that transaction is closed.
295	txReadyOrClosed chan struct{}
296	// state is the current transaction status of the ReadOnly transaction.
297	state txState
298	// sh is the sessionHandle allocated from sp.
299	sh *sessionHandle
300	// rts is the read timestamp returned by transactional reads.
301	rts time.Time
302	// tb is the read staleness bound specification for transactional reads.
303	tb TimestampBound
304}
305
306// errTxInitTimeout returns error for timeout in waiting for initialization of the transaction.
307func errTxInitTimeout() error {
308	return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization")
309}
310
311// getTimestampBound returns the read staleness bound specified for the ReadOnlyTransaction.
312func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound {
313	t.mu.Lock()
314	defer t.mu.Unlock()
315	return t.tb
316}
317
318// begin starts a snapshot read-only Transaction on Cloud Spanner.
319func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
320	var (
321		locked bool
322		tx     transactionID
323		rts    time.Time
324		sh     *sessionHandle
325		err    error
326	)
327	defer func() {
328		if !locked {
329			t.mu.Lock()
330			// Not necessary, just to make it clear that t.mu is being held when locked == true.
331			locked = true
332		}
333		if t.state != txClosed {
334			// Signal other initialization routines.
335			close(t.txReadyOrClosed)
336			t.txReadyOrClosed = make(chan struct{})
337		}
338		t.mu.Unlock()
339		if err != nil && sh != nil {
340			// Got a valid session handle, but failed to initialize transaction on Cloud Spanner.
341			if shouldDropSession(err) {
342				sh.destroy()
343			}
344			// If sh.destroy was already executed, this becomes a noop.
345			sh.recycle()
346		}
347	}()
348	sh, err = t.sp.take(ctx)
349	if err != nil {
350		return err
351	}
352	err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error {
353		res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
354			Session: sh.getID(),
355			Options: &sppb.TransactionOptions{
356				Mode: &sppb.TransactionOptions_ReadOnly_{
357					ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
358				},
359			},
360		})
361		if e != nil {
362			return e
363		}
364		tx = res.Id
365		if res.ReadTimestamp != nil {
366			rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
367		}
368		return nil
369	})
370	t.mu.Lock()
371	locked = true            // defer function will be executed with t.mu being held.
372	if t.state == txClosed { // During the execution of t.begin(), t.Close() was invoked.
373		return errSessionClosed(sh)
374	}
375	// If begin() fails, this allows other queries to take over the initialization.
376	t.tx = nil
377	if err == nil {
378		t.tx = tx
379		t.rts = rts
380		t.sh = sh
381		// State transite to txActive.
382		t.state = txActive
383	}
384	return err
385}
386
387// acquire implements txReadEnv.acquire.
388func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
389	if err := checkNestedTxn(ctx); err != nil {
390		return nil, nil, err
391	}
392	if t.singleUse {
393		return t.acquireSingleUse(ctx)
394	}
395	return t.acquireMultiUse(ctx)
396}
397
398func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
399	t.mu.Lock()
400	defer t.mu.Unlock()
401	switch t.state {
402	case txClosed:
403		// A closed single-use transaction can never be reused.
404		return nil, nil, errTxClosed()
405	case txNew:
406		t.state = txClosed
407		ts := &sppb.TransactionSelector{
408			Selector: &sppb.TransactionSelector_SingleUse{
409				SingleUse: &sppb.TransactionOptions{
410					Mode: &sppb.TransactionOptions_ReadOnly_{
411						ReadOnly: buildTransactionOptionsReadOnly(t.tb, true),
412					},
413				},
414			},
415		}
416		sh, err := t.sp.take(ctx)
417		if err != nil {
418			return nil, nil, err
419		}
420		// Install session handle into t, which can be used for readonly operations later.
421		t.sh = sh
422		return sh, ts, nil
423	}
424	us := t.state
425	// SingleUse transaction should only be in either txNew state or txClosed state.
426	return nil, nil, errUnexpectedTxState(us)
427}
428
429func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
430	for {
431		t.mu.Lock()
432		switch t.state {
433		case txClosed:
434			t.mu.Unlock()
435			return nil, nil, errTxClosed()
436		case txNew:
437			// State transit to txInit so that no further TimestampBound change is accepted.
438			t.state = txInit
439			t.mu.Unlock()
440			continue
441		case txInit:
442			if t.tx != nil {
443				// Wait for a transaction ID to become ready.
444				txReadyOrClosed := t.txReadyOrClosed
445				t.mu.Unlock()
446				select {
447				case <-txReadyOrClosed:
448					// Need to check transaction state again.
449					continue
450				case <-ctx.Done():
451					// The waiting for initialization is timeout, return error directly.
452					return nil, nil, errTxInitTimeout()
453				}
454			}
455			// Take the ownership of initializing the transaction.
456			t.tx = transactionID{}
457			t.mu.Unlock()
458			// Begin a read-only transaction.
459			// TODO: consider adding a transaction option which allow queries to initiate transactions by themselves. Note that this option might not be
460			// always good because the ID of the new transaction won't be ready till the query returns some data or completes.
461			if err := t.begin(ctx); err != nil {
462				return nil, nil, err
463			}
464			// If t.begin() succeeded, t.state should have been changed to txActive, so we can just continue here.
465			continue
466		case txActive:
467			sh := t.sh
468			ts := &sppb.TransactionSelector{
469				Selector: &sppb.TransactionSelector_Id{
470					Id: t.tx,
471				},
472			}
473			t.mu.Unlock()
474			return sh, ts, nil
475		}
476		state := t.state
477		t.mu.Unlock()
478		return nil, nil, errUnexpectedTxState(state)
479	}
480}
481
482func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) {
483	t.mu.Lock()
484	defer t.mu.Unlock()
485	if t.rts.IsZero() {
486		t.rts = ts
487	}
488}
489
490// release implements txReadEnv.release.
491func (t *ReadOnlyTransaction) release(err error) {
492	t.mu.Lock()
493	sh := t.sh
494	t.mu.Unlock()
495	if sh != nil { // sh could be nil if t.acquire() fails.
496		if shouldDropSession(err) {
497			sh.destroy()
498		}
499		if t.singleUse {
500			// If session handle is already destroyed, this becomes a noop.
501			sh.recycle()
502		}
503	}
504}
505
506// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads after being closed.
507func (t *ReadOnlyTransaction) Close() {
508	if t.singleUse {
509		return
510	}
511	t.mu.Lock()
512	if t.state != txClosed {
513		t.state = txClosed
514		close(t.txReadyOrClosed)
515	}
516	sh := t.sh
517	t.mu.Unlock()
518	if sh == nil {
519		return
520	}
521	// If session handle is already destroyed, this becomes a noop.
522	// If there are still active queries and if the recycled session is reused before they complete, Cloud Spanner will cancel them
523	// on behalf of the new transaction on the session.
524	if sh != nil {
525		sh.recycle()
526	}
527}
528
529// Timestamp returns the timestamp chosen to perform reads and
530// queries in this transaction. The value can only be read after some
531// read or query has either returned some data or completed without
532// returning any data.
533func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) {
534	t.mu.Lock()
535	defer t.mu.Unlock()
536	if t.rts.IsZero() {
537		return t.rts, errRtsUnavailable()
538	}
539	return t.rts, nil
540}
541
542// WithTimestampBound specifies the TimestampBound to use for read or query.
543// This can only be used before the first read or query is invoked. Note:
544// bounded staleness is not available with general ReadOnlyTransactions; use a
545// single-use ReadOnlyTransaction instead.
546//
547// The returned value is the ReadOnlyTransaction so calls can be chained.
548func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction {
549	t.mu.Lock()
550	defer t.mu.Unlock()
551	if t.state == txNew {
552		// Only allow to set TimestampBound before the first query.
553		t.tb = tb
554	}
555	return t
556}
557
558// ReadWriteTransaction provides a locking read-write transaction.
559//
560// This type of transaction is the only way to write data into Cloud Spanner;
561// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use
562// transactions internally. These transactions rely on pessimistic locking and,
563// if necessary, two-phase commit. Locking read-write transactions may abort,
564// requiring the application to retry. However, the interface exposed by
565// (*Client).ReadWriteTransaction eliminates the need for applications to write
566// retry loops explicitly.
567//
568// Locking transactions may be used to atomically read-modify-write data
569// anywhere in a database. This type of transaction is externally consistent.
570//
571// Clients should attempt to minimize the amount of time a transaction is
572// active. Faster transactions commit with higher probability and cause less
573// contention. Cloud Spanner attempts to keep read locks active as long as the
574// transaction continues to do reads.  Long periods of inactivity at the client
575// may cause Cloud Spanner to release a transaction's locks and abort it.
576//
577// Reads performed within a transaction acquire locks on the data being
578// read. Writes can only be done at commit time, after all reads have been
579// completed. Conceptually, a read-write transaction consists of zero or more
580// reads or SQL queries followed by a commit.
581//
582// See (*Client).ReadWriteTransaction for an example.
583//
584// Semantics
585//
586// Cloud Spanner can commit the transaction if all read locks it acquired are still
587// valid at commit time, and it is able to acquire write locks for all
588// writes. Cloud Spanner can abort the transaction for any reason. If a commit
589// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has not
590// modified any user data in Cloud Spanner.
591//
592// Unless the transaction commits, Cloud Spanner makes no guarantees about how long
593// the transaction's locks were held for. It is an error to use Cloud Spanner locks
594// for any sort of mutual exclusion other than between Cloud Spanner transactions
595// themselves.
596//
597// Aborted transactions
598//
599// Application code does not need to retry explicitly; RunInTransaction will
600// automatically retry a transaction if an attempt results in an abort. The
601// lock priority of a transaction increases after each prior aborted
602// transaction, meaning that the next attempt has a slightly better chance of
603// success than before.
604//
605// Under some circumstances (e.g., many transactions attempting to modify the
606// same row(s)), a transaction can abort many times in a short period before
607// successfully committing. Thus, it is not a good idea to cap the number of
608// retries a transaction can attempt; instead, it is better to limit the total
609// amount of wall time spent retrying.
610//
611// Idle transactions
612//
613// A transaction is considered idle if it has no outstanding reads or SQL
614// queries and has not started a read or SQL query within the last 10
615// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't hold
616// on to locks indefinitely. In that case, the commit will fail with error
617// ABORTED.
618//
619// If this behavior is undesirable, periodically executing a simple SQL query
620// in the transaction (e.g., SELECT 1) prevents the transaction from becoming
621// idle.
622type ReadWriteTransaction struct {
623	// txReadOnly contains methods for performing transactional reads.
624	txReadOnly
625	// sh is the sessionHandle allocated from sp. It is set only once during the initialization of ReadWriteTransaction.
626	sh *sessionHandle
627	// tx is the transaction ID in Cloud Spanner that uniquely identifies the ReadWriteTransaction.
628	// It is set only once in ReadWriteTransaction.begin() during the initialization of ReadWriteTransaction.
629	tx transactionID
630	// mu protects concurrent access to the internal states of ReadWriteTransaction.
631	mu sync.Mutex
632	// state is the current transaction status of the read-write transaction.
633	state txState
634	// wb is the set of buffered mutations waiting to be committed.
635	wb []*Mutation
636}
637
638// BufferWrite adds a list of mutations to the set of updates that will be
639// applied when the transaction is committed. It does not actually apply the
640// write until the transaction is committed, so the operation does not
641// block. The effects of the write won't be visible to any reads (including
642// reads done in the same transaction) until the transaction commits.
643//
644// See the example for Client.ReadWriteTransaction.
645func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error {
646	t.mu.Lock()
647	defer t.mu.Unlock()
648	if t.state == txClosed {
649		return errTxClosed()
650	}
651	if t.state != txActive {
652		return errUnexpectedTxState(t.state)
653	}
654	t.wb = append(t.wb, ms...)
655	return nil
656}
657
658// Update executes a DML statement against the database. It returns the number of
659// affected rows.
660// Update returns an error if the statement is a query. However, the
661// query is executed, and any data read will be validated upon commit.
662func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) {
663	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update")
664	defer func() { trace.EndSpan(ctx, err) }()
665	req, sh, err := t.prepareExecuteSQL(ctx, stmt, sppb.ExecuteSqlRequest_NORMAL)
666	if err != nil {
667		return 0, err
668	}
669	resultSet, err := sh.getClient().ExecuteSql(ctx, req)
670	if err != nil {
671		return 0, err
672	}
673	if resultSet.Stats == nil {
674		return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL)
675	}
676	return extractRowCount(resultSet.Stats)
677}
678
679// BatchUpdate groups one or more DML statements and sends them to Spanner in a
680// single RPC. This is an efficient way to execute multiple DML statements.
681//
682// A slice of counts is returned, where each count represents the number of
683// affected rows for the given query at the same index. If an error occurs,
684// counts will be returned up to the query that encountered the error.
685func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) {
686	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate")
687	defer func() { trace.EndSpan(ctx, err) }()
688
689	sh, ts, err := t.acquire(ctx)
690	if err != nil {
691		return nil, err
692	}
693	// Cloud Spanner will return "Session not found" on bad sessions.
694	sid := sh.getID()
695	if sid == "" {
696		// Might happen if transaction is closed in the middle of a API call.
697		return nil, errSessionClosed(sh)
698	}
699
700	var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement
701	for _, st := range stmts {
702		params, paramTypes, err := st.convertParams()
703		if err != nil {
704			return nil, err
705		}
706		sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{
707			Sql:        st.SQL,
708			Params:     params,
709			ParamTypes: paramTypes,
710		})
711	}
712
713	resp, err := sh.getClient().ExecuteBatchDml(ctx, &sppb.ExecuteBatchDmlRequest{
714		Session:     sh.getID(),
715		Transaction: ts,
716		Statements:  sppbStmts,
717		Seqno:       atomic.AddInt64(&t.sequenceNumber, 1),
718	})
719	if err != nil {
720		return nil, err
721	}
722
723	var counts []int64
724	for _, rs := range resp.ResultSets {
725		count, err := extractRowCount(rs.Stats)
726		if err != nil {
727			return nil, err
728		}
729		counts = append(counts, count)
730	}
731	if resp.Status.Code != 0 {
732		return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message)
733	}
734	return counts, nil
735}
736
737// acquire implements txReadEnv.acquire.
738func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
739	ts := &sppb.TransactionSelector{
740		Selector: &sppb.TransactionSelector_Id{
741			Id: t.tx,
742		},
743	}
744	t.mu.Lock()
745	defer t.mu.Unlock()
746	switch t.state {
747	case txClosed:
748		return nil, nil, errTxClosed()
749	case txActive:
750		return t.sh, ts, nil
751	}
752	return nil, nil, errUnexpectedTxState(t.state)
753}
754
755// release implements txReadEnv.release.
756func (t *ReadWriteTransaction) release(err error) {
757	t.mu.Lock()
758	sh := t.sh
759	t.mu.Unlock()
760	if sh != nil && shouldDropSession(err) {
761		sh.destroy()
762	}
763}
764
765func beginTransaction(ctx context.Context, sid string, client sppb.SpannerClient) (transactionID, error) {
766	var tx transactionID
767	err := runRetryable(ctx, func(ctx context.Context) error {
768		res, e := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
769			Session: sid,
770			Options: &sppb.TransactionOptions{
771				Mode: &sppb.TransactionOptions_ReadWrite_{
772					ReadWrite: &sppb.TransactionOptions_ReadWrite{},
773				},
774			},
775		})
776		if e != nil {
777			return e
778		}
779		tx = res.Id
780		return nil
781	})
782	if err != nil {
783		return nil, err
784	}
785	return tx, nil
786}
787
788// begin starts a read-write transacton on Cloud Spanner, it is always called before any of the public APIs.
789func (t *ReadWriteTransaction) begin(ctx context.Context) error {
790	if t.tx != nil {
791		t.state = txActive
792		return nil
793	}
794	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
795	if err == nil {
796		t.tx = tx
797		t.state = txActive
798		return nil
799	}
800	if shouldDropSession(err) {
801		t.sh.destroy()
802	}
803	return err
804}
805
806// commit tries to commit a readwrite transaction to Cloud Spanner. It also returns the commit timestamp for the transactions.
807func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
808	var ts time.Time
809	t.mu.Lock()
810	t.state = txClosed // No further operations after commit.
811	mPb, err := mutationsProto(t.wb)
812	t.mu.Unlock()
813	if err != nil {
814		return ts, err
815	}
816	// In case that sessionHandle was destroyed but transaction body fails to report it.
817	sid, client := t.sh.getID(), t.sh.getClient()
818	if sid == "" || client == nil {
819		return ts, errSessionClosed(t.sh)
820	}
821	err = runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error {
822		var trailer metadata.MD
823		res, e := client.Commit(ctx, &sppb.CommitRequest{
824			Session: sid,
825			Transaction: &sppb.CommitRequest_TransactionId{
826				TransactionId: t.tx,
827			},
828			Mutations: mPb,
829		}, grpc.Trailer(&trailer))
830		if e != nil {
831			return toSpannerErrorWithMetadata(e, trailer)
832		}
833		if tstamp := res.GetCommitTimestamp(); tstamp != nil {
834			ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
835		}
836		return nil
837	})
838	if shouldDropSession(err) {
839		t.sh.destroy()
840	}
841	return ts, err
842}
843
844// rollback is called when a commit is aborted or the transaction body runs into error.
845func (t *ReadWriteTransaction) rollback(ctx context.Context) {
846	t.mu.Lock()
847	// Forbid further operations on rollbacked transaction.
848	t.state = txClosed
849	t.mu.Unlock()
850	// In case that sessionHandle was destroyed but transaction body fails to report it.
851	sid, client := t.sh.getID(), t.sh.getClient()
852	if sid == "" || client == nil {
853		return
854	}
855	err := runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error {
856		_, e := client.Rollback(ctx, &sppb.RollbackRequest{
857			Session:       sid,
858			TransactionId: t.tx,
859		})
860		return e
861	})
862	if shouldDropSession(err) {
863		t.sh.destroy()
864	}
865}
866
867// runInTransaction executes f under a read-write transaction context.
868func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) {
869	var (
870		ts  time.Time
871		err error
872	)
873	if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
874		// Try to commit if transaction body returns no error.
875		ts, err = t.commit(ctx)
876	}
877	if err != nil {
878		if isAbortErr(err) {
879			// Retry the transaction using the same session on ABORT error.
880			// Cloud Spanner will create the new transaction with the previous one's wound-wait priority.
881			err = errRetry(err)
882			return ts, err
883		}
884		// Not going to commit, according to API spec, should rollback the transaction.
885		t.rollback(ctx)
886		return ts, err
887	}
888	// err == nil, return commit timestamp.
889	return ts, nil
890}
891
892// writeOnlyTransaction provides the most efficient way of doing write-only transactions. It essentially does blind writes to Cloud Spanner.
893type writeOnlyTransaction struct {
894	// sp is the session pool which writeOnlyTransaction uses to get Cloud Spanner sessions for blind writes.
895	sp *sessionPool
896}
897
898// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once, unless one of the following happens:
899//     1) Context times out.
900//     2) An unretryable error (e.g. database not found) occurs.
901//     3) There is a malformed Mutation object.
902func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) {
903	var (
904		ts time.Time
905		sh *sessionHandle
906	)
907	mPb, err := mutationsProto(ms)
908	if err != nil {
909		// Malformed mutation found, just return the error.
910		return ts, err
911	}
912	err = runRetryable(ctx, func(ct context.Context) error {
913		var e error
914		var trailers metadata.MD
915		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
916			// No usable session for doing the commit, take one from pool.
917			sh, e = t.sp.take(ctx)
918			if e != nil {
919				// sessionPool.Take already retries for session creations/retrivals.
920				return e
921			}
922		}
923		res, e := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
924			Session: sh.getID(),
925			Transaction: &sppb.CommitRequest_SingleUseTransaction{
926				SingleUseTransaction: &sppb.TransactionOptions{
927					Mode: &sppb.TransactionOptions_ReadWrite_{
928						ReadWrite: &sppb.TransactionOptions_ReadWrite{},
929					},
930				},
931			},
932			Mutations: mPb,
933		}, grpc.Trailer(&trailers))
934		if e != nil {
935			if isAbortErr(e) {
936				// Mask ABORT error as retryable, because aborted transactions are allowed to be retried.
937				return errRetry(toSpannerErrorWithMetadata(e, trailers))
938			}
939			if shouldDropSession(e) {
940				// Discard the bad session.
941				sh.destroy()
942			}
943			return e
944		}
945		if tstamp := res.GetCommitTimestamp(); tstamp != nil {
946			ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
947		}
948		return nil
949	})
950	if sh != nil {
951		sh.recycle()
952	}
953	return ts, err
954}
955
956// isAbortedErr returns true if the error indicates that an gRPC call is aborted on the server side.
957func isAbortErr(err error) bool {
958	if err == nil {
959		return false
960	}
961	if ErrCode(err) == codes.Aborted {
962		return true
963	}
964	return false
965}
966