1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	"cloud.google.com/go/internal/trace"
26	"google.golang.org/api/iterator"
27	sppb "google.golang.org/genproto/googleapis/spanner/v1"
28	"google.golang.org/grpc"
29	"google.golang.org/grpc/codes"
30	"google.golang.org/grpc/metadata"
31)
32
33// transactionID stores a transaction ID which uniquely identifies a transaction
34// in Cloud Spanner.
35type transactionID []byte
36
37// txReadEnv manages a read-transaction environment consisting of a session
38// handle and a transaction selector.
39type txReadEnv interface {
40	// acquire returns a read-transaction environment that can be used to
41	// perform a transactional read.
42	acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error)
43	// sets the transaction's read timestamp
44	setTimestamp(time.Time)
45	// release should be called at the end of every transactional read to deal
46	// with session recycling.
47	release(error)
48}
49
50// txReadOnly contains methods for doing transactional reads.
51type txReadOnly struct {
52	// read-transaction environment for performing transactional read
53	// operations.
54	txReadEnv
55
56	// Atomic. Only needed for DML statements, but used forall.
57	sequenceNumber int64
58}
59
60// errSessionClosed returns error for using a recycled/destroyed session
61func errSessionClosed(sh *sessionHandle) error {
62	return spannerErrorf(codes.FailedPrecondition,
63		"session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient())
64}
65
66// Read returns a RowIterator for reading multiple rows from the database.
67func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator {
68	return t.ReadWithOptions(ctx, table, keys, columns, nil)
69}
70
71// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}.
72func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) {
73	return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index})
74}
75
76// ReadOptions provides options for reading rows from a database.
77type ReadOptions struct {
78	// The index to use for reading. If non-empty, you can only read columns
79	// that are part of the index key, part of the primary key, or stored in the
80	// index due to a STORING clause in the index definition.
81	Index string
82
83	// The maximum number of rows to read. A limit value less than 1 means no
84	// limit.
85	Limit int
86}
87
88// ReadWithOptions returns a RowIterator for reading multiple rows from the
89// database. Pass a ReadOptions to modify the read operation.
90func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) {
91	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read")
92	defer func() { trace.EndSpan(ctx, ri.err) }()
93	var (
94		sh  *sessionHandle
95		ts  *sppb.TransactionSelector
96		err error
97	)
98	kset, err := keys.keySetProto()
99	if err != nil {
100		return &RowIterator{err: err}
101	}
102	if sh, ts, err = t.acquire(ctx); err != nil {
103		return &RowIterator{err: err}
104	}
105	// Cloud Spanner will return "Session not found" on bad sessions.
106	sid, client := sh.getID(), sh.getClient()
107	if sid == "" || client == nil {
108		// Might happen if transaction is closed in the middle of a API call.
109		return &RowIterator{err: errSessionClosed(sh)}
110	}
111	index := ""
112	limit := 0
113	if opts != nil {
114		index = opts.Index
115		if opts.Limit > 0 {
116			limit = opts.Limit
117		}
118	}
119	return stream(
120		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
121		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
122			return client.StreamingRead(ctx,
123				&sppb.ReadRequest{
124					Session:     sid,
125					Transaction: ts,
126					Table:       table,
127					Index:       index,
128					Columns:     columns,
129					KeySet:      kset,
130					ResumeToken: resumeToken,
131					Limit:       int64(limit),
132				})
133		},
134		t.setTimestamp,
135		t.release,
136	)
137}
138
139// errRowNotFound returns error for not being able to read the row identified by
140// key.
141func errRowNotFound(table string, key Key) error {
142	return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key)
143}
144
145// ReadRow reads a single row from the database.
146//
147// If no row is present with the given key, then ReadRow returns an error where
148// spanner.ErrCode(err) is codes.NotFound.
149func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) {
150	iter := t.Read(ctx, table, key, columns)
151	defer iter.Stop()
152	row, err := iter.Next()
153	switch err {
154	case iterator.Done:
155		return nil, errRowNotFound(table, key)
156	case nil:
157		return row, nil
158	default:
159		return nil, err
160	}
161}
162
163// Query executes a query against the database. It returns a RowIterator for
164// retrieving the resulting rows.
165//
166// Query returns only row data, without a query plan or execution statistics.
167// Use QueryWithStats to get rows along with the plan and statistics. Use
168// AnalyzeQuery to get just the plan.
169func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator {
170	return t.query(ctx, statement, sppb.ExecuteSqlRequest_NORMAL)
171}
172
173// Query executes a SQL statement against the database. It returns a RowIterator
174// for retrieving the resulting rows. The RowIterator will also be populated
175// with a query plan and execution statistics.
176func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator {
177	return t.query(ctx, statement, sppb.ExecuteSqlRequest_PROFILE)
178}
179
180// AnalyzeQuery returns the query plan for statement.
181func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) {
182	iter := t.query(ctx, statement, sppb.ExecuteSqlRequest_PLAN)
183	defer iter.Stop()
184	for {
185		_, err := iter.Next()
186		if err == iterator.Done {
187			break
188		}
189		if err != nil {
190			return nil, err
191		}
192	}
193	if iter.QueryPlan == nil {
194		return nil, spannerErrorf(codes.Internal, "query plan unavailable")
195	}
196	return iter.QueryPlan, nil
197}
198
199func (t *txReadOnly) query(ctx context.Context, statement Statement, mode sppb.ExecuteSqlRequest_QueryMode) (ri *RowIterator) {
200	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query")
201	defer func() { trace.EndSpan(ctx, ri.err) }()
202	req, sh, err := t.prepareExecuteSQL(ctx, statement, mode)
203	if err != nil {
204		return &RowIterator{err: err}
205	}
206	client := sh.getClient()
207	return stream(
208		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
209		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
210			req.ResumeToken = resumeToken
211			return client.ExecuteStreamingSql(ctx, req)
212		},
213		t.setTimestamp,
214		t.release)
215}
216
217func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, mode sppb.ExecuteSqlRequest_QueryMode) (*sppb.ExecuteSqlRequest, *sessionHandle, error) {
218	sh, ts, err := t.acquire(ctx)
219	if err != nil {
220		return nil, nil, err
221	}
222	// Cloud Spanner will return "Session not found" on bad sessions.
223	sid := sh.getID()
224	if sid == "" {
225		// Might happen if transaction is closed in the middle of a API call.
226		return nil, nil, errSessionClosed(sh)
227	}
228	params, paramTypes, err := stmt.convertParams()
229	if err != nil {
230		return nil, nil, err
231	}
232	req := &sppb.ExecuteSqlRequest{
233		Session:     sid,
234		Transaction: ts,
235		Sql:         stmt.SQL,
236		QueryMode:   mode,
237		Seqno:       atomic.AddInt64(&t.sequenceNumber, 1),
238		Params:      params,
239		ParamTypes:  paramTypes,
240	}
241	return req, sh, nil
242}
243
244// txState is the status of a transaction.
245type txState int
246
247const (
248	// transaction is new, waiting to be initialized..
249	txNew txState = iota
250	// transaction is being initialized.
251	txInit
252	// transaction is active and can perform read/write.
253	txActive
254	// transaction is closed, cannot be used anymore.
255	txClosed
256)
257
258// errRtsUnavailable returns error for read transaction's read timestamp being
259// unavailable.
260func errRtsUnavailable() error {
261	return spannerErrorf(codes.Internal, "read timestamp is unavailable")
262}
263
264// errTxClosed returns error for using a closed transaction.
265func errTxClosed() error {
266	return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction")
267}
268
269// errUnexpectedTxState returns error for transaction enters an unexpected state.
270func errUnexpectedTxState(ts txState) error {
271	return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts)
272}
273
274// ReadOnlyTransaction provides a snapshot transaction with guaranteed
275// consistency across reads, but does not allow writes.  Read-only transactions
276// can be configured to read at timestamps in the past.
277//
278// Read-only transactions do not take locks. Instead, they work by choosing a
279// Cloud Spanner timestamp, then executing all reads at that timestamp. Since
280// they do not acquire locks, they do not block concurrent read-write
281// transactions.
282//
283// Unlike locking read-write transactions, read-only transactions never abort.
284// They can fail if the chosen read timestamp is garbage collected; however, the
285// default garbage collection policy is generous enough that most applications
286// do not need to worry about this in practice. See the documentation of
287// TimestampBound for more details.
288//
289// A ReadOnlyTransaction consumes resources on the server until Close is called.
290type ReadOnlyTransaction struct {
291	// mu protects concurrent access to the internal states of ReadOnlyTransaction.
292	mu sync.Mutex
293	// txReadOnly contains methods for performing transactional reads.
294	txReadOnly
295	// singleUse indicates that the transaction can be used for only one read.
296	singleUse bool
297	// sp is the session pool for allocating a session to execute the read-only
298	// transaction. It is set only once during initialization of the
299	// ReadOnlyTransaction.
300	sp *sessionPool
301	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
302	// ReadOnlyTransaction.
303	tx transactionID
304	// txReadyOrClosed is for broadcasting that transaction ID has been returned
305	// by Cloud Spanner or that transaction is closed.
306	txReadyOrClosed chan struct{}
307	// state is the current transaction status of the ReadOnly transaction.
308	state txState
309	// sh is the sessionHandle allocated from sp.
310	sh *sessionHandle
311	// rts is the read timestamp returned by transactional reads.
312	rts time.Time
313	// tb is the read staleness bound specification for transactional reads.
314	tb TimestampBound
315}
316
317// errTxInitTimeout returns error for timeout in waiting for initialization of
318// the transaction.
319func errTxInitTimeout() error {
320	return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization")
321}
322
323// getTimestampBound returns the read staleness bound specified for the
324// ReadOnlyTransaction.
325func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound {
326	t.mu.Lock()
327	defer t.mu.Unlock()
328	return t.tb
329}
330
331// begin starts a snapshot read-only Transaction on Cloud Spanner.
332func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
333	var (
334		locked bool
335		tx     transactionID
336		rts    time.Time
337		sh     *sessionHandle
338		err    error
339	)
340	defer func() {
341		if !locked {
342			t.mu.Lock()
343			// Not necessary, just to make it clear that t.mu is being held when
344			// locked == true.
345			locked = true
346		}
347		if t.state != txClosed {
348			// Signal other initialization routines.
349			close(t.txReadyOrClosed)
350			t.txReadyOrClosed = make(chan struct{})
351		}
352		t.mu.Unlock()
353		if err != nil && sh != nil {
354			// Got a valid session handle, but failed to initialize transaction=
355			// on Cloud Spanner.
356			if shouldDropSession(err) {
357				sh.destroy()
358			}
359			// If sh.destroy was already executed, this becomes a noop.
360			sh.recycle()
361		}
362	}()
363	sh, err = t.sp.take(ctx)
364	if err != nil {
365		return err
366	}
367	err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error {
368		res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
369			Session: sh.getID(),
370			Options: &sppb.TransactionOptions{
371				Mode: &sppb.TransactionOptions_ReadOnly_{
372					ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
373				},
374			},
375		})
376		if e != nil {
377			return e
378		}
379		tx = res.Id
380		if res.ReadTimestamp != nil {
381			rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
382		}
383		return nil
384	})
385	t.mu.Lock()
386
387	// defer function will be executed with t.mu being held.
388	locked = true
389
390	// During the execution of t.begin(), t.Close() was invoked.
391	if t.state == txClosed {
392		return errSessionClosed(sh)
393	}
394
395	// If begin() fails, this allows other queries to take over the
396	// initialization.
397	t.tx = nil
398	if err == nil {
399		t.tx = tx
400		t.rts = rts
401		t.sh = sh
402		// State transite to txActive.
403		t.state = txActive
404	}
405	return err
406}
407
408// acquire implements txReadEnv.acquire.
409func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
410	if err := checkNestedTxn(ctx); err != nil {
411		return nil, nil, err
412	}
413	if t.singleUse {
414		return t.acquireSingleUse(ctx)
415	}
416	return t.acquireMultiUse(ctx)
417}
418
419func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
420	t.mu.Lock()
421	defer t.mu.Unlock()
422	switch t.state {
423	case txClosed:
424		// A closed single-use transaction can never be reused.
425		return nil, nil, errTxClosed()
426	case txNew:
427		t.state = txClosed
428		ts := &sppb.TransactionSelector{
429			Selector: &sppb.TransactionSelector_SingleUse{
430				SingleUse: &sppb.TransactionOptions{
431					Mode: &sppb.TransactionOptions_ReadOnly_{
432						ReadOnly: buildTransactionOptionsReadOnly(t.tb, true),
433					},
434				},
435			},
436		}
437		sh, err := t.sp.take(ctx)
438		if err != nil {
439			return nil, nil, err
440		}
441
442		// Install session handle into t, which can be used for readonly
443		// operations later.
444		t.sh = sh
445		return sh, ts, nil
446	}
447	us := t.state
448
449	// SingleUse transaction should only be in either txNew state or txClosed
450	// state.
451	return nil, nil, errUnexpectedTxState(us)
452}
453
454func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
455	for {
456		t.mu.Lock()
457		switch t.state {
458		case txClosed:
459			t.mu.Unlock()
460			return nil, nil, errTxClosed()
461		case txNew:
462			// State transit to txInit so that no further TimestampBound change
463			// is accepted.
464			t.state = txInit
465			t.mu.Unlock()
466			continue
467		case txInit:
468			if t.tx != nil {
469				// Wait for a transaction ID to become ready.
470				txReadyOrClosed := t.txReadyOrClosed
471				t.mu.Unlock()
472				select {
473				case <-txReadyOrClosed:
474					// Need to check transaction state again.
475					continue
476				case <-ctx.Done():
477					// The waiting for initialization is timeout, return error
478					// directly.
479					return nil, nil, errTxInitTimeout()
480				}
481			}
482			// Take the ownership of initializing the transaction.
483			t.tx = transactionID{}
484			t.mu.Unlock()
485			// Begin a read-only transaction.
486			//
487			// TODO: consider adding a transaction option which allow queries to
488			//  initiate transactions by themselves. Note that this option might
489			//  not be always good because the ID of the new transaction won't
490			//  be ready till the query returns some data or completes.
491			if err := t.begin(ctx); err != nil {
492				return nil, nil, err
493			}
494
495			// If t.begin() succeeded, t.state should have been changed to
496			// txActive, so we can just continue here.
497			continue
498		case txActive:
499			sh := t.sh
500			ts := &sppb.TransactionSelector{
501				Selector: &sppb.TransactionSelector_Id{
502					Id: t.tx,
503				},
504			}
505			t.mu.Unlock()
506			return sh, ts, nil
507		}
508		state := t.state
509		t.mu.Unlock()
510		return nil, nil, errUnexpectedTxState(state)
511	}
512}
513
514func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) {
515	t.mu.Lock()
516	defer t.mu.Unlock()
517	if t.rts.IsZero() {
518		t.rts = ts
519	}
520}
521
522// release implements txReadEnv.release.
523func (t *ReadOnlyTransaction) release(err error) {
524	t.mu.Lock()
525	sh := t.sh
526	t.mu.Unlock()
527	if sh != nil { // sh could be nil if t.acquire() fails.
528		if shouldDropSession(err) {
529			sh.destroy()
530		}
531		if t.singleUse {
532			// If session handle is already destroyed, this becomes a noop.
533			sh.recycle()
534		}
535	}
536}
537
538// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads
539// after being closed.
540func (t *ReadOnlyTransaction) Close() {
541	if t.singleUse {
542		return
543	}
544	t.mu.Lock()
545	if t.state != txClosed {
546		t.state = txClosed
547		close(t.txReadyOrClosed)
548	}
549	sh := t.sh
550	t.mu.Unlock()
551	if sh == nil {
552		return
553	}
554	// If session handle is already destroyed, this becomes a noop. If there are
555	// still active queries and if the recycled session is reused before they
556	// complete, Cloud Spanner will cancel them on behalf of the new transaction
557	// on the session.
558	if sh != nil {
559		sh.recycle()
560	}
561}
562
563// Timestamp returns the timestamp chosen to perform reads and queries in this
564// transaction. The value can only be read after some read or query has either
565// returned some data or completed without returning any data.
566func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) {
567	t.mu.Lock()
568	defer t.mu.Unlock()
569	if t.rts.IsZero() {
570		return t.rts, errRtsUnavailable()
571	}
572	return t.rts, nil
573}
574
575// WithTimestampBound specifies the TimestampBound to use for read or query.
576// This can only be used before the first read or query is invoked. Note:
577// bounded staleness is not available with general ReadOnlyTransactions; use a
578// single-use ReadOnlyTransaction instead.
579//
580// The returned value is the ReadOnlyTransaction so calls can be chained.
581func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction {
582	t.mu.Lock()
583	defer t.mu.Unlock()
584	if t.state == txNew {
585		// Only allow to set TimestampBound before the first query.
586		t.tb = tb
587	}
588	return t
589}
590
591// ReadWriteTransaction provides a locking read-write transaction.
592//
593// This type of transaction is the only way to write data into Cloud Spanner;
594// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use
595// transactions internally. These transactions rely on pessimistic locking and,
596// if necessary, two-phase commit. Locking read-write transactions may abort,
597// requiring the application to retry. However, the interface exposed by
598// (*Client).ReadWriteTransaction eliminates the need for applications to write
599// retry loops explicitly.
600//
601// Locking transactions may be used to atomically read-modify-write data
602// anywhere in a database. This type of transaction is externally consistent.
603//
604// Clients should attempt to minimize the amount of time a transaction is
605// active. Faster transactions commit with higher probability and cause less
606// contention. Cloud Spanner attempts to keep read locks active as long as the
607// transaction continues to do reads.  Long periods of inactivity at the client
608// may cause Cloud Spanner to release a transaction's locks and abort it.
609//
610// Reads performed within a transaction acquire locks on the data being
611// read. Writes can only be done at commit time, after all reads have been
612// completed. Conceptually, a read-write transaction consists of zero or more
613// reads or SQL queries followed by a commit.
614//
615// See (*Client).ReadWriteTransaction for an example.
616//
617// Semantics
618//
619// Cloud Spanner can commit the transaction if all read locks it acquired are
620// still valid at commit time, and it is able to acquire write locks for all
621// writes. Cloud Spanner can abort the transaction for any reason. If a commit
622// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has
623// not modified any user data in Cloud Spanner.
624//
625// Unless the transaction commits, Cloud Spanner makes no guarantees about how
626// long the transaction's locks were held for. It is an error to use Cloud
627// Spanner locks for any sort of mutual exclusion other than between Cloud
628// Spanner transactions themselves.
629//
630// Aborted transactions
631//
632// Application code does not need to retry explicitly; RunInTransaction will
633// automatically retry a transaction if an attempt results in an abort. The lock
634// priority of a transaction increases after each prior aborted transaction,
635// meaning that the next attempt has a slightly better chance of success than
636// before.
637//
638// Under some circumstances (e.g., many transactions attempting to modify the
639// same row(s)), a transaction can abort many times in a short period before
640// successfully committing. Thus, it is not a good idea to cap the number of
641// retries a transaction can attempt; instead, it is better to limit the total
642// amount of wall time spent retrying.
643//
644// Idle transactions
645//
646// A transaction is considered idle if it has no outstanding reads or SQL
647// queries and has not started a read or SQL query within the last 10
648// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't
649// hold on to locks indefinitely. In that case, the commit will fail with error
650// ABORTED.
651//
652// If this behavior is undesirable, periodically executing a simple SQL query
653// in the transaction (e.g., SELECT 1) prevents the transaction from becoming
654// idle.
655type ReadWriteTransaction struct {
656	// txReadOnly contains methods for performing transactional reads.
657	txReadOnly
658	// sh is the sessionHandle allocated from sp. It is set only once during the
659	// initialization of ReadWriteTransaction.
660	sh *sessionHandle
661	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
662	// ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin()
663	// during the initialization of ReadWriteTransaction.
664	tx transactionID
665	// mu protects concurrent access to the internal states of
666	// ReadWriteTransaction.
667	mu sync.Mutex
668	// state is the current transaction status of the read-write transaction.
669	state txState
670	// wb is the set of buffered mutations waiting to be committed.
671	wb []*Mutation
672}
673
674// BufferWrite adds a list of mutations to the set of updates that will be
675// applied when the transaction is committed. It does not actually apply the
676// write until the transaction is committed, so the operation does not block.
677// The effects of the write won't be visible to any reads (including reads done
678// in the same transaction) until the transaction commits.
679//
680// See the example for Client.ReadWriteTransaction.
681func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error {
682	t.mu.Lock()
683	defer t.mu.Unlock()
684	if t.state == txClosed {
685		return errTxClosed()
686	}
687	if t.state != txActive {
688		return errUnexpectedTxState(t.state)
689	}
690	t.wb = append(t.wb, ms...)
691	return nil
692}
693
694// Update executes a DML statement against the database. It returns the number
695// of affected rows. Update returns an error if the statement is a query.
696// However, the query is executed, and any data read will be validated upon
697// commit.
698func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) {
699	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update")
700	defer func() { trace.EndSpan(ctx, err) }()
701	req, sh, err := t.prepareExecuteSQL(ctx, stmt, sppb.ExecuteSqlRequest_NORMAL)
702	if err != nil {
703		return 0, err
704	}
705	resultSet, err := sh.getClient().ExecuteSql(ctx, req)
706	if err != nil {
707		return 0, err
708	}
709	if resultSet.Stats == nil {
710		return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL)
711	}
712	return extractRowCount(resultSet.Stats)
713}
714
715// BatchUpdate groups one or more DML statements and sends them to Spanner in a
716// single RPC. This is an efficient way to execute multiple DML statements.
717//
718// A slice of counts is returned, where each count represents the number of
719// affected rows for the given query at the same index. If an error occurs,
720// counts will be returned up to the query that encountered the error.
721func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) {
722	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate")
723	defer func() { trace.EndSpan(ctx, err) }()
724
725	sh, ts, err := t.acquire(ctx)
726	if err != nil {
727		return nil, err
728	}
729	// Cloud Spanner will return "Session not found" on bad sessions.
730	sid := sh.getID()
731	if sid == "" {
732		// Might happen if transaction is closed in the middle of a API call.
733		return nil, errSessionClosed(sh)
734	}
735
736	var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement
737	for _, st := range stmts {
738		params, paramTypes, err := st.convertParams()
739		if err != nil {
740			return nil, err
741		}
742		sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{
743			Sql:        st.SQL,
744			Params:     params,
745			ParamTypes: paramTypes,
746		})
747	}
748
749	resp, err := sh.getClient().ExecuteBatchDml(ctx, &sppb.ExecuteBatchDmlRequest{
750		Session:     sh.getID(),
751		Transaction: ts,
752		Statements:  sppbStmts,
753		Seqno:       atomic.AddInt64(&t.sequenceNumber, 1),
754	})
755	if err != nil {
756		return nil, err
757	}
758
759	var counts []int64
760	for _, rs := range resp.ResultSets {
761		count, err := extractRowCount(rs.Stats)
762		if err != nil {
763			return nil, err
764		}
765		counts = append(counts, count)
766	}
767	if resp.Status.Code != 0 {
768		return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message)
769	}
770	return counts, nil
771}
772
773// acquire implements txReadEnv.acquire.
774func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
775	ts := &sppb.TransactionSelector{
776		Selector: &sppb.TransactionSelector_Id{
777			Id: t.tx,
778		},
779	}
780	t.mu.Lock()
781	defer t.mu.Unlock()
782	switch t.state {
783	case txClosed:
784		return nil, nil, errTxClosed()
785	case txActive:
786		return t.sh, ts, nil
787	}
788	return nil, nil, errUnexpectedTxState(t.state)
789}
790
791// release implements txReadEnv.release.
792func (t *ReadWriteTransaction) release(err error) {
793	t.mu.Lock()
794	sh := t.sh
795	t.mu.Unlock()
796	if sh != nil && shouldDropSession(err) {
797		sh.destroy()
798	}
799}
800
801func beginTransaction(ctx context.Context, sid string, client sppb.SpannerClient) (transactionID, error) {
802	var tx transactionID
803	err := runRetryable(ctx, func(ctx context.Context) error {
804		res, e := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
805			Session: sid,
806			Options: &sppb.TransactionOptions{
807				Mode: &sppb.TransactionOptions_ReadWrite_{
808					ReadWrite: &sppb.TransactionOptions_ReadWrite{},
809				},
810			},
811		})
812		if e != nil {
813			return e
814		}
815		tx = res.Id
816		return nil
817	})
818	if err != nil {
819		return nil, err
820	}
821	return tx, nil
822}
823
824// begin starts a read-write transacton on Cloud Spanner, it is always called
825// before any of the public APIs.
826func (t *ReadWriteTransaction) begin(ctx context.Context) error {
827	if t.tx != nil {
828		t.state = txActive
829		return nil
830	}
831	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
832	if err == nil {
833		t.tx = tx
834		t.state = txActive
835		return nil
836	}
837	if shouldDropSession(err) {
838		t.sh.destroy()
839	}
840	return err
841}
842
843// commit tries to commit a readwrite transaction to Cloud Spanner. It also
844// returns the commit timestamp for the transactions.
845func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
846	var ts time.Time
847	t.mu.Lock()
848	t.state = txClosed // No further operations after commit.
849	mPb, err := mutationsProto(t.wb)
850	t.mu.Unlock()
851	if err != nil {
852		return ts, err
853	}
854	// In case that sessionHandle was destroyed but transaction body fails to
855	// report it.
856	sid, client := t.sh.getID(), t.sh.getClient()
857	if sid == "" || client == nil {
858		return ts, errSessionClosed(t.sh)
859	}
860	err = runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error {
861		var trailer metadata.MD
862		res, e := client.Commit(ctx, &sppb.CommitRequest{
863			Session: sid,
864			Transaction: &sppb.CommitRequest_TransactionId{
865				TransactionId: t.tx,
866			},
867			Mutations: mPb,
868		}, grpc.Trailer(&trailer))
869		if e != nil {
870			return toSpannerErrorWithMetadata(e, trailer)
871		}
872		if tstamp := res.GetCommitTimestamp(); tstamp != nil {
873			ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
874		}
875		return nil
876	})
877	if shouldDropSession(err) {
878		t.sh.destroy()
879	}
880	return ts, err
881}
882
883// rollback is called when a commit is aborted or the transaction body runs
884// into error.
885func (t *ReadWriteTransaction) rollback(ctx context.Context) {
886	t.mu.Lock()
887	// Forbid further operations on rollbacked transaction.
888	t.state = txClosed
889	t.mu.Unlock()
890	// In case that sessionHandle was destroyed but transaction body fails to
891	// report it.
892	sid, client := t.sh.getID(), t.sh.getClient()
893	if sid == "" || client == nil {
894		return
895	}
896	err := runRetryable(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), func(ctx context.Context) error {
897		_, e := client.Rollback(ctx, &sppb.RollbackRequest{
898			Session:       sid,
899			TransactionId: t.tx,
900		})
901		return e
902	})
903	if shouldDropSession(err) {
904		t.sh.destroy()
905	}
906}
907
908// runInTransaction executes f under a read-write transaction context.
909func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) {
910	var (
911		ts  time.Time
912		err error
913	)
914	if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
915		// Try to commit if transaction body returns no error.
916		ts, err = t.commit(ctx)
917	}
918	if err != nil {
919		if isAbortErr(err) {
920			// Retry the transaction using the same session on ABORT error.
921			// Cloud Spanner will create the new transaction with the previous
922			// one's wound-wait priority.
923			err = errRetry(err)
924			return ts, err
925		}
926		// Not going to commit, according to API spec, should rollback the
927		// transaction.
928		t.rollback(ctx)
929		return ts, err
930	}
931	// err == nil, return commit timestamp.
932	return ts, nil
933}
934
935// writeOnlyTransaction provides the most efficient way of doing write-only
936// transactions. It essentially does blind writes to Cloud Spanner.
937type writeOnlyTransaction struct {
938	// sp is the session pool which writeOnlyTransaction uses to get Cloud
939	// Spanner sessions for blind writes.
940	sp *sessionPool
941}
942
943// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once,
944// unless one of the following happens:
945//
946//     1) Context times out.
947//     2) An unretryable error (e.g. database not found) occurs.
948//     3) There is a malformed Mutation object.
949func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) {
950	var (
951		ts time.Time
952		sh *sessionHandle
953	)
954	mPb, err := mutationsProto(ms)
955	if err != nil {
956		// Malformed mutation found, just return the error.
957		return ts, err
958	}
959	err = runRetryable(ctx, func(ct context.Context) error {
960		var e error
961		var trailers metadata.MD
962		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
963			// No usable session for doing the commit, take one from pool.
964			sh, e = t.sp.take(ctx)
965			if e != nil {
966				// sessionPool.Take already retries for session
967				// creations/retrivals.
968				return e
969			}
970		}
971		res, e := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
972			Session: sh.getID(),
973			Transaction: &sppb.CommitRequest_SingleUseTransaction{
974				SingleUseTransaction: &sppb.TransactionOptions{
975					Mode: &sppb.TransactionOptions_ReadWrite_{
976						ReadWrite: &sppb.TransactionOptions_ReadWrite{},
977					},
978				},
979			},
980			Mutations: mPb,
981		}, grpc.Trailer(&trailers))
982		if e != nil {
983			if isAbortErr(e) {
984				// Mask ABORT error as retryable, because aborted transactions
985				// are allowed to be retried.
986				return errRetry(toSpannerErrorWithMetadata(e, trailers))
987			}
988			if shouldDropSession(e) {
989				// Discard the bad session.
990				sh.destroy()
991			}
992			return e
993		}
994		if tstamp := res.GetCommitTimestamp(); tstamp != nil {
995			ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
996		}
997		return nil
998	})
999	if sh != nil {
1000		sh.recycle()
1001	}
1002	return ts, err
1003}
1004
1005// isAbortedErr returns true if the error indicates that an gRPC call is
1006// aborted on the server side.
1007func isAbortErr(err error) bool {
1008	if err == nil {
1009		return false
1010	}
1011	if ErrCode(err) == codes.Aborted {
1012		return true
1013	}
1014	return false
1015}
1016