1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	"cloud.google.com/go/internal/trace"
26	vkit "cloud.google.com/go/spanner/apiv1"
27	"google.golang.org/api/iterator"
28	sppb "google.golang.org/genproto/googleapis/spanner/v1"
29	"google.golang.org/grpc/codes"
30)
31
32// transactionID stores a transaction ID which uniquely identifies a transaction
33// in Cloud Spanner.
34type transactionID []byte
35
36// txReadEnv manages a read-transaction environment consisting of a session
37// handle and a transaction selector.
38type txReadEnv interface {
39	// acquire returns a read-transaction environment that can be used to
40	// perform a transactional read.
41	acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error)
42	// sets the transaction's read timestamp
43	setTimestamp(time.Time)
44	// release should be called at the end of every transactional read to deal
45	// with session recycling.
46	release(error)
47}
48
49// txReadOnly contains methods for doing transactional reads.
50type txReadOnly struct {
51	// read-transaction environment for performing transactional read
52	// operations.
53	txReadEnv
54
55	// Atomic. Only needed for DML statements, but used forall.
56	sequenceNumber int64
57
58	// replaceSessionFunc is a function that can be called to replace the
59	// session that is used by the transaction. This function should only be
60	// defined for single-use transactions that can safely be retried on a
61	// different session. All other transactions will set this function to nil.
62	replaceSessionFunc func(ctx context.Context) error
63
64	// sp is the session pool for allocating a session to execute the read-only
65	// transaction. It is set only once during initialization of the
66	// txReadOnly.
67	sp *sessionPool
68	// sh is the sessionHandle allocated from sp.
69	sh *sessionHandle
70}
71
72// errSessionClosed returns error for using a recycled/destroyed session
73func errSessionClosed(sh *sessionHandle) error {
74	return spannerErrorf(codes.FailedPrecondition,
75		"session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient())
76}
77
78// Read returns a RowIterator for reading multiple rows from the database.
79func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator {
80	return t.ReadWithOptions(ctx, table, keys, columns, nil)
81}
82
83// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}.
84func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) {
85	return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index})
86}
87
88// ReadOptions provides options for reading rows from a database.
89type ReadOptions struct {
90	// The index to use for reading. If non-empty, you can only read columns
91	// that are part of the index key, part of the primary key, or stored in the
92	// index due to a STORING clause in the index definition.
93	Index string
94
95	// The maximum number of rows to read. A limit value less than 1 means no
96	// limit.
97	Limit int
98}
99
100// ReadWithOptions returns a RowIterator for reading multiple rows from the
101// database. Pass a ReadOptions to modify the read operation.
102func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) {
103	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read")
104	defer func() { trace.EndSpan(ctx, ri.err) }()
105	var (
106		sh  *sessionHandle
107		ts  *sppb.TransactionSelector
108		err error
109	)
110	kset, err := keys.keySetProto()
111	if err != nil {
112		return &RowIterator{err: err}
113	}
114	if sh, ts, err = t.acquire(ctx); err != nil {
115		return &RowIterator{err: err}
116	}
117	// Cloud Spanner will return "Session not found" on bad sessions.
118	sid, client := sh.getID(), sh.getClient()
119	if sid == "" || client == nil {
120		// Might happen if transaction is closed in the middle of a API call.
121		return &RowIterator{err: errSessionClosed(sh)}
122	}
123	index := ""
124	limit := 0
125	if opts != nil {
126		index = opts.Index
127		if opts.Limit > 0 {
128			limit = opts.Limit
129		}
130	}
131	return stream(
132		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
133		sh.session.logger,
134		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
135			return client.StreamingRead(ctx,
136				&sppb.ReadRequest{
137					Session:     sid,
138					Transaction: ts,
139					Table:       table,
140					Index:       index,
141					Columns:     columns,
142					KeySet:      kset,
143					ResumeToken: resumeToken,
144					Limit:       int64(limit),
145				})
146		},
147		t.setTimestamp,
148		t.release,
149	)
150}
151
152// errRowNotFound returns error for not being able to read the row identified by
153// key.
154func errRowNotFound(table string, key Key) error {
155	return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key)
156}
157
158// errRowNotFoundByIndex returns error for not being able to read the row by index.
159func errRowNotFoundByIndex(table string, key Key, index string) error {
160	return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
161}
162
163// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index.
164func errMultipleRowsFound(table string, key Key, index string) error {
165	return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
166}
167
168// ReadRow reads a single row from the database.
169//
170// If no row is present with the given key, then ReadRow returns an error where
171// spanner.ErrCode(err) is codes.NotFound.
172func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) {
173	iter := t.Read(ctx, table, key, columns)
174	defer iter.Stop()
175	row, err := iter.Next()
176	switch err {
177	case iterator.Done:
178		return nil, errRowNotFound(table, key)
179	case nil:
180		return row, nil
181	default:
182		return nil, err
183	}
184}
185
186// ReadRowUsingIndex reads a single row from the database using an index.
187//
188// If no row is present with the given index, then ReadRowUsingIndex returns an
189// error where spanner.ErrCode(err) is codes.NotFound.
190//
191// If more than one row received with the given index, then ReadRowUsingIndex
192// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition.
193func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) {
194	iter := t.ReadUsingIndex(ctx, table, index, key, columns)
195	defer iter.Stop()
196	row, err := iter.Next()
197	switch err {
198	case iterator.Done:
199		return nil, errRowNotFoundByIndex(table, key, index)
200	case nil:
201		// If more than one row found, return an error.
202		_, err := iter.Next()
203		switch err {
204		case iterator.Done:
205			return row, nil
206		case nil:
207			return nil, errMultipleRowsFound(table, key, index)
208		default:
209			return nil, err
210		}
211	default:
212		return nil, err
213	}
214}
215
216// Query executes a query against the database. It returns a RowIterator for
217// retrieving the resulting rows.
218//
219// Query returns only row data, without a query plan or execution statistics.
220// Use QueryWithStats to get rows along with the plan and statistics. Use
221// AnalyzeQuery to get just the plan.
222func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator {
223	return t.query(ctx, statement, sppb.ExecuteSqlRequest_NORMAL)
224}
225
226// Query executes a SQL statement against the database. It returns a RowIterator
227// for retrieving the resulting rows. The RowIterator will also be populated
228// with a query plan and execution statistics.
229func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator {
230	return t.query(ctx, statement, sppb.ExecuteSqlRequest_PROFILE)
231}
232
233// AnalyzeQuery returns the query plan for statement.
234func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) {
235	iter := t.query(ctx, statement, sppb.ExecuteSqlRequest_PLAN)
236	defer iter.Stop()
237	for {
238		_, err := iter.Next()
239		if err == iterator.Done {
240			break
241		}
242		if err != nil {
243			return nil, err
244		}
245	}
246	if iter.QueryPlan == nil {
247		return nil, spannerErrorf(codes.Internal, "query plan unavailable")
248	}
249	return iter.QueryPlan, nil
250}
251
252func (t *txReadOnly) query(ctx context.Context, statement Statement, mode sppb.ExecuteSqlRequest_QueryMode) (ri *RowIterator) {
253	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query")
254	defer func() { trace.EndSpan(ctx, ri.err) }()
255	req, sh, err := t.prepareExecuteSQL(ctx, statement, mode)
256	if err != nil {
257		return &RowIterator{err: err}
258	}
259	client := sh.getClient()
260	return streamWithReplaceSessionFunc(
261		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
262		sh.session.logger,
263		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
264			req.ResumeToken = resumeToken
265			req.Session = t.sh.getID()
266			return client.ExecuteStreamingSql(ctx, req)
267		},
268		t.replaceSessionFunc,
269		t.setTimestamp,
270		t.release)
271}
272
273func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, mode sppb.ExecuteSqlRequest_QueryMode) (*sppb.ExecuteSqlRequest, *sessionHandle, error) {
274	sh, ts, err := t.acquire(ctx)
275	if err != nil {
276		return nil, nil, err
277	}
278	// Cloud Spanner will return "Session not found" on bad sessions.
279	sid := sh.getID()
280	if sid == "" {
281		// Might happen if transaction is closed in the middle of a API call.
282		return nil, nil, errSessionClosed(sh)
283	}
284	params, paramTypes, err := stmt.convertParams()
285	if err != nil {
286		return nil, nil, err
287	}
288	req := &sppb.ExecuteSqlRequest{
289		Session:     sid,
290		Transaction: ts,
291		Sql:         stmt.SQL,
292		QueryMode:   mode,
293		Seqno:       atomic.AddInt64(&t.sequenceNumber, 1),
294		Params:      params,
295		ParamTypes:  paramTypes,
296	}
297	return req, sh, nil
298}
299
300// txState is the status of a transaction.
301type txState int
302
303const (
304	// transaction is new, waiting to be initialized..
305	txNew txState = iota
306	// transaction is being initialized.
307	txInit
308	// transaction is active and can perform read/write.
309	txActive
310	// transaction is closed, cannot be used anymore.
311	txClosed
312)
313
314// errRtsUnavailable returns error for read transaction's read timestamp being
315// unavailable.
316func errRtsUnavailable() error {
317	return spannerErrorf(codes.Internal, "read timestamp is unavailable")
318}
319
320// errTxClosed returns error for using a closed transaction.
321func errTxClosed() error {
322	return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction")
323}
324
325// errUnexpectedTxState returns error for transaction enters an unexpected state.
326func errUnexpectedTxState(ts txState) error {
327	return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts)
328}
329
330// ReadOnlyTransaction provides a snapshot transaction with guaranteed
331// consistency across reads, but does not allow writes.  Read-only transactions
332// can be configured to read at timestamps in the past.
333//
334// Read-only transactions do not take locks. Instead, they work by choosing a
335// Cloud Spanner timestamp, then executing all reads at that timestamp. Since
336// they do not acquire locks, they do not block concurrent read-write
337// transactions.
338//
339// Unlike locking read-write transactions, read-only transactions never abort.
340// They can fail if the chosen read timestamp is garbage collected; however, the
341// default garbage collection policy is generous enough that most applications
342// do not need to worry about this in practice. See the documentation of
343// TimestampBound for more details.
344//
345// A ReadOnlyTransaction consumes resources on the server until Close is called.
346type ReadOnlyTransaction struct {
347	// mu protects concurrent access to the internal states of ReadOnlyTransaction.
348	mu sync.Mutex
349	// txReadOnly contains methods for performing transactional reads.
350	txReadOnly
351	// singleUse indicates that the transaction can be used for only one read.
352	singleUse bool
353	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
354	// ReadOnlyTransaction.
355	tx transactionID
356	// txReadyOrClosed is for broadcasting that transaction ID has been returned
357	// by Cloud Spanner or that transaction is closed.
358	txReadyOrClosed chan struct{}
359	// state is the current transaction status of the ReadOnly transaction.
360	state txState
361	// rts is the read timestamp returned by transactional reads.
362	rts time.Time
363	// tb is the read staleness bound specification for transactional reads.
364	tb TimestampBound
365}
366
367// errTxInitTimeout returns error for timeout in waiting for initialization of
368// the transaction.
369func errTxInitTimeout() error {
370	return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization")
371}
372
373// getTimestampBound returns the read staleness bound specified for the
374// ReadOnlyTransaction.
375func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound {
376	t.mu.Lock()
377	defer t.mu.Unlock()
378	return t.tb
379}
380
381// begin starts a snapshot read-only Transaction on Cloud Spanner.
382func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
383	var (
384		locked bool
385		tx     transactionID
386		rts    time.Time
387		sh     *sessionHandle
388		err    error
389		res    *sppb.Transaction
390	)
391	defer func() {
392		if !locked {
393			t.mu.Lock()
394			// Not necessary, just to make it clear that t.mu is being held when
395			// locked == true.
396			locked = true
397		}
398		if t.state != txClosed {
399			// Signal other initialization routines.
400			close(t.txReadyOrClosed)
401			t.txReadyOrClosed = make(chan struct{})
402		}
403		t.mu.Unlock()
404		if err != nil && sh != nil {
405			// Got a valid session handle, but failed to initialize transaction=
406			// on Cloud Spanner.
407			if isSessionNotFoundError(err) {
408				sh.destroy()
409			}
410			// If sh.destroy was already executed, this becomes a noop.
411			sh.recycle()
412		}
413	}()
414	// Retry the BeginTransaction call if a 'Session not found' is returned.
415	for {
416		sh, err = t.sp.take(ctx)
417		if err != nil {
418			return err
419		}
420		res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
421			Session: sh.getID(),
422			Options: &sppb.TransactionOptions{
423				Mode: &sppb.TransactionOptions_ReadOnly_{
424					ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
425				},
426			},
427		})
428		if isSessionNotFoundError(err) {
429			sh.destroy()
430			continue
431		} else if err == nil {
432			tx = res.Id
433			if res.ReadTimestamp != nil {
434				rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
435			}
436		} else {
437			err = toSpannerError(err)
438		}
439		break
440	}
441	t.mu.Lock()
442
443	// defer function will be executed with t.mu being held.
444	locked = true
445
446	// During the execution of t.begin(), t.Close() was invoked.
447	if t.state == txClosed {
448		return errSessionClosed(sh)
449	}
450
451	// If begin() fails, this allows other queries to take over the
452	// initialization.
453	t.tx = nil
454	if err == nil {
455		t.tx = tx
456		t.rts = rts
457		t.sh = sh
458		// State transite to txActive.
459		t.state = txActive
460	}
461	return err
462}
463
464// acquire implements txReadEnv.acquire.
465func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
466	if err := checkNestedTxn(ctx); err != nil {
467		return nil, nil, err
468	}
469	if t.singleUse {
470		return t.acquireSingleUse(ctx)
471	}
472	return t.acquireMultiUse(ctx)
473}
474
475func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
476	t.mu.Lock()
477	defer t.mu.Unlock()
478	switch t.state {
479	case txClosed:
480		// A closed single-use transaction can never be reused.
481		return nil, nil, errTxClosed()
482	case txNew:
483		t.state = txClosed
484		ts := &sppb.TransactionSelector{
485			Selector: &sppb.TransactionSelector_SingleUse{
486				SingleUse: &sppb.TransactionOptions{
487					Mode: &sppb.TransactionOptions_ReadOnly_{
488						ReadOnly: buildTransactionOptionsReadOnly(t.tb, true),
489					},
490				},
491			},
492		}
493		sh, err := t.sp.take(ctx)
494		if err != nil {
495			return nil, nil, err
496		}
497
498		// Install session handle into t, which can be used for readonly
499		// operations later.
500		t.sh = sh
501		return sh, ts, nil
502	}
503	us := t.state
504
505	// SingleUse transaction should only be in either txNew state or txClosed
506	// state.
507	return nil, nil, errUnexpectedTxState(us)
508}
509
510func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
511	for {
512		t.mu.Lock()
513		switch t.state {
514		case txClosed:
515			t.mu.Unlock()
516			return nil, nil, errTxClosed()
517		case txNew:
518			// State transit to txInit so that no further TimestampBound change
519			// is accepted.
520			t.state = txInit
521			t.mu.Unlock()
522			continue
523		case txInit:
524			if t.tx != nil {
525				// Wait for a transaction ID to become ready.
526				txReadyOrClosed := t.txReadyOrClosed
527				t.mu.Unlock()
528				select {
529				case <-txReadyOrClosed:
530					// Need to check transaction state again.
531					continue
532				case <-ctx.Done():
533					// The waiting for initialization is timeout, return error
534					// directly.
535					return nil, nil, errTxInitTimeout()
536				}
537			}
538			// Take the ownership of initializing the transaction.
539			t.tx = transactionID{}
540			t.mu.Unlock()
541			// Begin a read-only transaction.
542			//
543			// TODO: consider adding a transaction option which allow queries to
544			//  initiate transactions by themselves. Note that this option might
545			//  not be always good because the ID of the new transaction won't
546			//  be ready till the query returns some data or completes.
547			if err := t.begin(ctx); err != nil {
548				return nil, nil, err
549			}
550
551			// If t.begin() succeeded, t.state should have been changed to
552			// txActive, so we can just continue here.
553			continue
554		case txActive:
555			sh := t.sh
556			ts := &sppb.TransactionSelector{
557				Selector: &sppb.TransactionSelector_Id{
558					Id: t.tx,
559				},
560			}
561			t.mu.Unlock()
562			return sh, ts, nil
563		}
564		state := t.state
565		t.mu.Unlock()
566		return nil, nil, errUnexpectedTxState(state)
567	}
568}
569
570func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) {
571	t.mu.Lock()
572	defer t.mu.Unlock()
573	if t.rts.IsZero() {
574		t.rts = ts
575	}
576}
577
578// release implements txReadEnv.release.
579func (t *ReadOnlyTransaction) release(err error) {
580	t.mu.Lock()
581	sh := t.sh
582	t.mu.Unlock()
583	if sh != nil { // sh could be nil if t.acquire() fails.
584		if isSessionNotFoundError(err) {
585			sh.destroy()
586		}
587		if t.singleUse {
588			// If session handle is already destroyed, this becomes a noop.
589			sh.recycle()
590		}
591	}
592}
593
594// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads
595// after being closed.
596func (t *ReadOnlyTransaction) Close() {
597	if t.singleUse {
598		return
599	}
600	t.mu.Lock()
601	if t.state != txClosed {
602		t.state = txClosed
603		close(t.txReadyOrClosed)
604	}
605	sh := t.sh
606	t.mu.Unlock()
607	if sh == nil {
608		return
609	}
610	// If session handle is already destroyed, this becomes a noop. If there are
611	// still active queries and if the recycled session is reused before they
612	// complete, Cloud Spanner will cancel them on behalf of the new transaction
613	// on the session.
614	if sh != nil {
615		sh.recycle()
616	}
617}
618
619// Timestamp returns the timestamp chosen to perform reads and queries in this
620// transaction. The value can only be read after some read or query has either
621// returned some data or completed without returning any data.
622func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) {
623	t.mu.Lock()
624	defer t.mu.Unlock()
625	if t.rts.IsZero() {
626		return t.rts, errRtsUnavailable()
627	}
628	return t.rts, nil
629}
630
631// WithTimestampBound specifies the TimestampBound to use for read or query.
632// This can only be used before the first read or query is invoked. Note:
633// bounded staleness is not available with general ReadOnlyTransactions; use a
634// single-use ReadOnlyTransaction instead.
635//
636// The returned value is the ReadOnlyTransaction so calls can be chained.
637func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction {
638	t.mu.Lock()
639	defer t.mu.Unlock()
640	if t.state == txNew {
641		// Only allow to set TimestampBound before the first query.
642		t.tb = tb
643	}
644	return t
645}
646
647// ReadWriteTransaction provides a locking read-write transaction.
648//
649// This type of transaction is the only way to write data into Cloud Spanner;
650// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use
651// transactions internally. These transactions rely on pessimistic locking and,
652// if necessary, two-phase commit. Locking read-write transactions may abort,
653// requiring the application to retry. However, the interface exposed by
654// (*Client).ReadWriteTransaction eliminates the need for applications to write
655// retry loops explicitly.
656//
657// Locking transactions may be used to atomically read-modify-write data
658// anywhere in a database. This type of transaction is externally consistent.
659//
660// Clients should attempt to minimize the amount of time a transaction is
661// active. Faster transactions commit with higher probability and cause less
662// contention. Cloud Spanner attempts to keep read locks active as long as the
663// transaction continues to do reads.  Long periods of inactivity at the client
664// may cause Cloud Spanner to release a transaction's locks and abort it.
665//
666// Reads performed within a transaction acquire locks on the data being
667// read. Writes can only be done at commit time, after all reads have been
668// completed. Conceptually, a read-write transaction consists of zero or more
669// reads or SQL queries followed by a commit.
670//
671// See (*Client).ReadWriteTransaction for an example.
672//
673// Semantics
674//
675// Cloud Spanner can commit the transaction if all read locks it acquired are
676// still valid at commit time, and it is able to acquire write locks for all
677// writes. Cloud Spanner can abort the transaction for any reason. If a commit
678// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has
679// not modified any user data in Cloud Spanner.
680//
681// Unless the transaction commits, Cloud Spanner makes no guarantees about how
682// long the transaction's locks were held for. It is an error to use Cloud
683// Spanner locks for any sort of mutual exclusion other than between Cloud
684// Spanner transactions themselves.
685//
686// Aborted transactions
687//
688// Application code does not need to retry explicitly; RunInTransaction will
689// automatically retry a transaction if an attempt results in an abort. The lock
690// priority of a transaction increases after each prior aborted transaction,
691// meaning that the next attempt has a slightly better chance of success than
692// before.
693//
694// Under some circumstances (e.g., many transactions attempting to modify the
695// same row(s)), a transaction can abort many times in a short period before
696// successfully committing. Thus, it is not a good idea to cap the number of
697// retries a transaction can attempt; instead, it is better to limit the total
698// amount of wall time spent retrying.
699//
700// Idle transactions
701//
702// A transaction is considered idle if it has no outstanding reads or SQL
703// queries and has not started a read or SQL query within the last 10
704// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't
705// hold on to locks indefinitely. In that case, the commit will fail with error
706// ABORTED.
707//
708// If this behavior is undesirable, periodically executing a simple SQL query
709// in the transaction (e.g., SELECT 1) prevents the transaction from becoming
710// idle.
711type ReadWriteTransaction struct {
712	// txReadOnly contains methods for performing transactional reads.
713	txReadOnly
714	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
715	// ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin()
716	// during the initialization of ReadWriteTransaction.
717	tx transactionID
718	// mu protects concurrent access to the internal states of
719	// ReadWriteTransaction.
720	mu sync.Mutex
721	// state is the current transaction status of the read-write transaction.
722	state txState
723	// wb is the set of buffered mutations waiting to be committed.
724	wb []*Mutation
725}
726
727// BufferWrite adds a list of mutations to the set of updates that will be
728// applied when the transaction is committed. It does not actually apply the
729// write until the transaction is committed, so the operation does not block.
730// The effects of the write won't be visible to any reads (including reads done
731// in the same transaction) until the transaction commits.
732//
733// See the example for Client.ReadWriteTransaction.
734func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error {
735	t.mu.Lock()
736	defer t.mu.Unlock()
737	if t.state == txClosed {
738		return errTxClosed()
739	}
740	if t.state != txActive {
741		return errUnexpectedTxState(t.state)
742	}
743	t.wb = append(t.wb, ms...)
744	return nil
745}
746
747// Update executes a DML statement against the database. It returns the number
748// of affected rows. Update returns an error if the statement is a query.
749// However, the query is executed, and any data read will be validated upon
750// commit.
751func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) {
752	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update")
753	defer func() { trace.EndSpan(ctx, err) }()
754	req, sh, err := t.prepareExecuteSQL(ctx, stmt, sppb.ExecuteSqlRequest_NORMAL)
755	if err != nil {
756		return 0, err
757	}
758	resultSet, err := sh.getClient().ExecuteSql(ctx, req)
759	if err != nil {
760		return 0, toSpannerError(err)
761	}
762	if resultSet.Stats == nil {
763		return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL)
764	}
765	return extractRowCount(resultSet.Stats)
766}
767
768// BatchUpdate groups one or more DML statements and sends them to Spanner in a
769// single RPC. This is an efficient way to execute multiple DML statements.
770//
771// A slice of counts is returned, where each count represents the number of
772// affected rows for the given query at the same index. If an error occurs,
773// counts will be returned up to the query that encountered the error.
774func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) {
775	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate")
776	defer func() { trace.EndSpan(ctx, err) }()
777
778	sh, ts, err := t.acquire(ctx)
779	if err != nil {
780		return nil, err
781	}
782	// Cloud Spanner will return "Session not found" on bad sessions.
783	sid := sh.getID()
784	if sid == "" {
785		// Might happen if transaction is closed in the middle of a API call.
786		return nil, errSessionClosed(sh)
787	}
788
789	var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement
790	for _, st := range stmts {
791		params, paramTypes, err := st.convertParams()
792		if err != nil {
793			return nil, err
794		}
795		sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{
796			Sql:        st.SQL,
797			Params:     params,
798			ParamTypes: paramTypes,
799		})
800	}
801
802	resp, err := sh.getClient().ExecuteBatchDml(ctx, &sppb.ExecuteBatchDmlRequest{
803		Session:     sh.getID(),
804		Transaction: ts,
805		Statements:  sppbStmts,
806		Seqno:       atomic.AddInt64(&t.sequenceNumber, 1),
807	})
808	if err != nil {
809		return nil, toSpannerError(err)
810	}
811
812	var counts []int64
813	for _, rs := range resp.ResultSets {
814		count, err := extractRowCount(rs.Stats)
815		if err != nil {
816			return nil, err
817		}
818		counts = append(counts, count)
819	}
820	if resp.Status.Code != 0 {
821		return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message)
822	}
823	return counts, nil
824}
825
826// acquire implements txReadEnv.acquire.
827func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
828	ts := &sppb.TransactionSelector{
829		Selector: &sppb.TransactionSelector_Id{
830			Id: t.tx,
831		},
832	}
833	t.mu.Lock()
834	defer t.mu.Unlock()
835	switch t.state {
836	case txClosed:
837		return nil, nil, errTxClosed()
838	case txActive:
839		return t.sh, ts, nil
840	}
841	return nil, nil, errUnexpectedTxState(t.state)
842}
843
844// release implements txReadEnv.release.
845func (t *ReadWriteTransaction) release(err error) {
846	t.mu.Lock()
847	sh := t.sh
848	t.mu.Unlock()
849	if sh != nil && isSessionNotFoundError(err) {
850		sh.destroy()
851	}
852}
853
854func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) {
855	res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
856		Session: sid,
857		Options: &sppb.TransactionOptions{
858			Mode: &sppb.TransactionOptions_ReadWrite_{
859				ReadWrite: &sppb.TransactionOptions_ReadWrite{},
860			},
861		},
862	})
863	if err != nil {
864		return nil, err
865	}
866	if res.Id == nil {
867		return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.")
868	}
869	return res.Id, nil
870}
871
872// begin starts a read-write transacton on Cloud Spanner, it is always called
873// before any of the public APIs.
874func (t *ReadWriteTransaction) begin(ctx context.Context) error {
875	if t.tx != nil {
876		t.state = txActive
877		return nil
878	}
879	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
880	if err == nil {
881		t.tx = tx
882		t.state = txActive
883		return nil
884	}
885	if isSessionNotFoundError(err) {
886		t.sh.destroy()
887	}
888	return err
889}
890
891// commit tries to commit a readwrite transaction to Cloud Spanner. It also
892// returns the commit timestamp for the transactions.
893func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
894	var ts time.Time
895	t.mu.Lock()
896	t.state = txClosed // No further operations after commit.
897	mPb, err := mutationsProto(t.wb)
898	t.mu.Unlock()
899	if err != nil {
900		return ts, err
901	}
902	// In case that sessionHandle was destroyed but transaction body fails to
903	// report it.
904	sid, client := t.sh.getID(), t.sh.getClient()
905	if sid == "" || client == nil {
906		return ts, errSessionClosed(t.sh)
907	}
908
909	res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{
910		Session: sid,
911		Transaction: &sppb.CommitRequest_TransactionId{
912			TransactionId: t.tx,
913		},
914		Mutations: mPb,
915	})
916	if e != nil {
917		return ts, toSpannerErrorWithCommitInfo(e, true)
918	}
919	if tstamp := res.GetCommitTimestamp(); tstamp != nil {
920		ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
921	}
922	if isSessionNotFoundError(err) {
923		t.sh.destroy()
924	}
925	return ts, err
926}
927
928// rollback is called when a commit is aborted or the transaction body runs
929// into error.
930func (t *ReadWriteTransaction) rollback(ctx context.Context) {
931	t.mu.Lock()
932	// Forbid further operations on rollbacked transaction.
933	t.state = txClosed
934	t.mu.Unlock()
935	// In case that sessionHandle was destroyed but transaction body fails to
936	// report it.
937	sid, client := t.sh.getID(), t.sh.getClient()
938	if sid == "" || client == nil {
939		return
940	}
941	err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{
942		Session:       sid,
943		TransactionId: t.tx,
944	})
945	if isSessionNotFoundError(err) {
946		t.sh.destroy()
947	}
948}
949
950// runInTransaction executes f under a read-write transaction context.
951func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) {
952	var (
953		ts              time.Time
954		err             error
955		errDuringCommit bool
956	)
957	if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
958		// Try to commit if transaction body returns no error.
959		ts, err = t.commit(ctx)
960		errDuringCommit = err != nil
961	}
962	if err != nil {
963		if isAbortErr(err) {
964			// Retry the transaction using the same session on ABORT error.
965			// Cloud Spanner will create the new transaction with the previous
966			// one's wound-wait priority.
967			return ts, err
968		}
969		if isSessionNotFoundError(err) {
970			t.sh.destroy()
971			return ts, err
972		}
973		// Rollback the transaction unless the error occurred during the
974		// commit. Executing a rollback after a commit has failed will
975		// otherwise cause an error. Note that transient errors, such as
976		// UNAVAILABLE, are already handled in the gRPC layer and do not show
977		// up here. Context errors (deadline exceeded / canceled) during
978		// commits are also not rolled back.
979		if !errDuringCommit {
980			t.rollback(ctx)
981		}
982		return ts, err
983	}
984	// err == nil, return commit timestamp.
985	return ts, nil
986}
987
988// writeOnlyTransaction provides the most efficient way of doing write-only
989// transactions. It essentially does blind writes to Cloud Spanner.
990type writeOnlyTransaction struct {
991	// sp is the session pool which writeOnlyTransaction uses to get Cloud
992	// Spanner sessions for blind writes.
993	sp *sessionPool
994}
995
996// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once,
997// unless one of the following happens:
998//
999//     1) Context times out.
1000//     2) An unretryable error (e.g. database not found) occurs.
1001//     3) There is a malformed Mutation object.
1002func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) {
1003	var (
1004		ts time.Time
1005		sh *sessionHandle
1006	)
1007	mPb, err := mutationsProto(ms)
1008	if err != nil {
1009		// Malformed mutation found, just return the error.
1010		return ts, err
1011	}
1012
1013	// Retry-loop for aborted transactions.
1014	// TODO: Replace with generic retryer.
1015	for {
1016		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
1017			// No usable session for doing the commit, take one from pool.
1018			sh, err = t.sp.take(ctx)
1019			if err != nil {
1020				// sessionPool.Take already retries for session
1021				// creations/retrivals.
1022				return ts, err
1023			}
1024			defer sh.recycle()
1025		}
1026		res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
1027			Session: sh.getID(),
1028			Transaction: &sppb.CommitRequest_SingleUseTransaction{
1029				SingleUseTransaction: &sppb.TransactionOptions{
1030					Mode: &sppb.TransactionOptions_ReadWrite_{
1031						ReadWrite: &sppb.TransactionOptions_ReadWrite{},
1032					},
1033				},
1034			},
1035			Mutations: mPb,
1036		})
1037		if err != nil && !isAbortErr(err) {
1038			if isSessionNotFoundError(err) {
1039				// Discard the bad session.
1040				sh.destroy()
1041			}
1042			return ts, toSpannerErrorWithCommitInfo(err, true)
1043		} else if err == nil {
1044			if tstamp := res.GetCommitTimestamp(); tstamp != nil {
1045				ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
1046			}
1047			break
1048		}
1049	}
1050	return ts, toSpannerError(err)
1051}
1052
1053// isAbortedErr returns true if the error indicates that an gRPC call is
1054// aborted on the server side.
1055func isAbortErr(err error) bool {
1056	if err == nil {
1057		return false
1058	}
1059	if ErrCode(err) == codes.Aborted {
1060		return true
1061	}
1062	return false
1063}
1064