1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	"cloud.google.com/go/internal/trace"
26	vkit "cloud.google.com/go/spanner/apiv1"
27	"github.com/golang/protobuf/proto"
28	"google.golang.org/api/iterator"
29	sppb "google.golang.org/genproto/googleapis/spanner/v1"
30	"google.golang.org/grpc/codes"
31	"google.golang.org/grpc/status"
32)
33
34// transactionID stores a transaction ID which uniquely identifies a transaction
35// in Cloud Spanner.
36type transactionID []byte
37
38// txReadEnv manages a read-transaction environment consisting of a session
39// handle and a transaction selector.
40type txReadEnv interface {
41	// acquire returns a read-transaction environment that can be used to
42	// perform a transactional read.
43	acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error)
44	// sets the transaction's read timestamp
45	setTimestamp(time.Time)
46	// release should be called at the end of every transactional read to deal
47	// with session recycling.
48	release(error)
49}
50
51// txReadOnly contains methods for doing transactional reads.
52type txReadOnly struct {
53	// read-transaction environment for performing transactional read
54	// operations.
55	txReadEnv
56
57	// Atomic. Only needed for DML statements, but used forall.
58	sequenceNumber int64
59
60	// replaceSessionFunc is a function that can be called to replace the
61	// session that is used by the transaction. This function should only be
62	// defined for single-use transactions that can safely be retried on a
63	// different session. All other transactions will set this function to nil.
64	replaceSessionFunc func(ctx context.Context) error
65
66	// sp is the session pool for allocating a session to execute the read-only
67	// transaction. It is set only once during initialization of the
68	// txReadOnly.
69	sp *sessionPool
70	// sh is the sessionHandle allocated from sp.
71	sh *sessionHandle
72
73	// qo provides options for executing a sql query.
74	qo QueryOptions
75
76	// txOpts provides options for a transaction.
77	txOpts TransactionOptions
78}
79
80// TransactionOptions provides options for a transaction.
81type TransactionOptions struct {
82	CommitOptions CommitOptions
83}
84
85// errSessionClosed returns error for using a recycled/destroyed session
86func errSessionClosed(sh *sessionHandle) error {
87	return spannerErrorf(codes.FailedPrecondition,
88		"session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient())
89}
90
91// Read returns a RowIterator for reading multiple rows from the database.
92func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator {
93	return t.ReadWithOptions(ctx, table, keys, columns, nil)
94}
95
96// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}.
97func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) {
98	return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index})
99}
100
101// ReadOptions provides options for reading rows from a database.
102type ReadOptions struct {
103	// The index to use for reading. If non-empty, you can only read columns
104	// that are part of the index key, part of the primary key, or stored in the
105	// index due to a STORING clause in the index definition.
106	Index string
107
108	// The maximum number of rows to read. A limit value less than 1 means no
109	// limit.
110	Limit int
111}
112
113// ReadWithOptions returns a RowIterator for reading multiple rows from the
114// database. Pass a ReadOptions to modify the read operation.
115func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) {
116	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read")
117	defer func() { trace.EndSpan(ctx, ri.err) }()
118	var (
119		sh  *sessionHandle
120		ts  *sppb.TransactionSelector
121		err error
122	)
123	kset, err := keys.keySetProto()
124	if err != nil {
125		return &RowIterator{err: err}
126	}
127	if sh, ts, err = t.acquire(ctx); err != nil {
128		return &RowIterator{err: err}
129	}
130	// Cloud Spanner will return "Session not found" on bad sessions.
131	client := sh.getClient()
132	if client == nil {
133		// Might happen if transaction is closed in the middle of a API call.
134		return &RowIterator{err: errSessionClosed(sh)}
135	}
136	index := ""
137	limit := 0
138	if opts != nil {
139		index = opts.Index
140		if opts.Limit > 0 {
141			limit = opts.Limit
142		}
143	}
144	return streamWithReplaceSessionFunc(
145		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
146		sh.session.logger,
147		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
148			return client.StreamingRead(ctx,
149				&sppb.ReadRequest{
150					Session:     t.sh.getID(),
151					Transaction: ts,
152					Table:       table,
153					Index:       index,
154					Columns:     columns,
155					KeySet:      kset,
156					ResumeToken: resumeToken,
157					Limit:       int64(limit),
158				})
159		},
160		t.replaceSessionFunc,
161		t.setTimestamp,
162		t.release,
163	)
164}
165
166// errRowNotFound returns error for not being able to read the row identified by
167// key.
168func errRowNotFound(table string, key Key) error {
169	return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key)
170}
171
172// errRowNotFoundByIndex returns error for not being able to read the row by index.
173func errRowNotFoundByIndex(table string, key Key, index string) error {
174	return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
175}
176
177// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index.
178func errMultipleRowsFound(table string, key Key, index string) error {
179	return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
180}
181
182// ReadRow reads a single row from the database.
183//
184// If no row is present with the given key, then ReadRow returns an error where
185// spanner.ErrCode(err) is codes.NotFound.
186func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) {
187	iter := t.Read(ctx, table, key, columns)
188	defer iter.Stop()
189	row, err := iter.Next()
190	switch err {
191	case iterator.Done:
192		return nil, errRowNotFound(table, key)
193	case nil:
194		return row, nil
195	default:
196		return nil, err
197	}
198}
199
200// ReadRowUsingIndex reads a single row from the database using an index.
201//
202// If no row is present with the given index, then ReadRowUsingIndex returns an
203// error where spanner.ErrCode(err) is codes.NotFound.
204//
205// If more than one row received with the given index, then ReadRowUsingIndex
206// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition.
207func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) {
208	iter := t.ReadUsingIndex(ctx, table, index, key, columns)
209	defer iter.Stop()
210	row, err := iter.Next()
211	switch err {
212	case iterator.Done:
213		return nil, errRowNotFoundByIndex(table, key, index)
214	case nil:
215		// If more than one row found, return an error.
216		_, err := iter.Next()
217		switch err {
218		case iterator.Done:
219			return row, nil
220		case nil:
221			return nil, errMultipleRowsFound(table, key, index)
222		default:
223			return nil, err
224		}
225	default:
226		return nil, err
227	}
228}
229
230// QueryOptions provides options for executing a sql query from a database.
231type QueryOptions struct {
232	Mode    *sppb.ExecuteSqlRequest_QueryMode
233	Options *sppb.ExecuteSqlRequest_QueryOptions
234}
235
236// merge combines two QueryOptions that the input parameter will have higher
237// order of precedence.
238func (qo QueryOptions) merge(opts QueryOptions) QueryOptions {
239	merged := QueryOptions{
240		Mode:    qo.Mode,
241		Options: &sppb.ExecuteSqlRequest_QueryOptions{},
242	}
243	if opts.Mode != nil {
244		merged.Mode = opts.Mode
245	}
246	proto.Merge(merged.Options, qo.Options)
247	proto.Merge(merged.Options, opts.Options)
248	return merged
249}
250
251// Query executes a query against the database. It returns a RowIterator for
252// retrieving the resulting rows.
253//
254// Query returns only row data, without a query plan or execution statistics.
255// Use QueryWithStats to get rows along with the plan and statistics. Use
256// AnalyzeQuery to get just the plan.
257func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator {
258	mode := sppb.ExecuteSqlRequest_NORMAL
259	return t.query(ctx, statement, QueryOptions{
260		Mode:    &mode,
261		Options: t.qo.Options,
262	})
263}
264
265// QueryWithOptions executes a SQL statment against the database. It returns
266// a RowIterator for retrieving the resulting rows. The sql query execution
267// will be optimized based on the given query options.
268func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, opts QueryOptions) *RowIterator {
269	return t.query(ctx, statement, t.qo.merge(opts))
270}
271
272// QueryWithStats executes a SQL statement against the database. It returns
273// a RowIterator for retrieving the resulting rows. The RowIterator will also
274// be populated with a query plan and execution statistics.
275func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator {
276	mode := sppb.ExecuteSqlRequest_PROFILE
277	return t.query(ctx, statement, QueryOptions{
278		Mode:    &mode,
279		Options: t.qo.Options,
280	})
281}
282
283// AnalyzeQuery returns the query plan for statement.
284func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) {
285	mode := sppb.ExecuteSqlRequest_PLAN
286	iter := t.query(ctx, statement, QueryOptions{
287		Mode:    &mode,
288		Options: t.qo.Options,
289	})
290	defer iter.Stop()
291	for {
292		_, err := iter.Next()
293		if err == iterator.Done {
294			break
295		}
296		if err != nil {
297			return nil, err
298		}
299	}
300	if iter.QueryPlan == nil {
301		return nil, spannerErrorf(codes.Internal, "query plan unavailable")
302	}
303	return iter.QueryPlan, nil
304}
305
306func (t *txReadOnly) query(ctx context.Context, statement Statement, options QueryOptions) (ri *RowIterator) {
307	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query")
308	defer func() { trace.EndSpan(ctx, ri.err) }()
309	req, sh, err := t.prepareExecuteSQL(ctx, statement, options)
310	if err != nil {
311		return &RowIterator{err: err}
312	}
313	client := sh.getClient()
314	return streamWithReplaceSessionFunc(
315		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
316		sh.session.logger,
317		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
318			req.ResumeToken = resumeToken
319			req.Session = t.sh.getID()
320			return client.ExecuteStreamingSql(ctx, req)
321		},
322		t.replaceSessionFunc,
323		t.setTimestamp,
324		t.release)
325}
326
327func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, options QueryOptions) (*sppb.ExecuteSqlRequest, *sessionHandle, error) {
328	sh, ts, err := t.acquire(ctx)
329	if err != nil {
330		return nil, nil, err
331	}
332	// Cloud Spanner will return "Session not found" on bad sessions.
333	sid := sh.getID()
334	if sid == "" {
335		// Might happen if transaction is closed in the middle of a API call.
336		return nil, nil, errSessionClosed(sh)
337	}
338	params, paramTypes, err := stmt.convertParams()
339	if err != nil {
340		return nil, nil, err
341	}
342	mode := sppb.ExecuteSqlRequest_NORMAL
343	if options.Mode != nil {
344		mode = *options.Mode
345	}
346	req := &sppb.ExecuteSqlRequest{
347		Session:      sid,
348		Transaction:  ts,
349		Sql:          stmt.SQL,
350		QueryMode:    mode,
351		Seqno:        atomic.AddInt64(&t.sequenceNumber, 1),
352		Params:       params,
353		ParamTypes:   paramTypes,
354		QueryOptions: options.Options,
355	}
356	return req, sh, nil
357}
358
359// txState is the status of a transaction.
360type txState int
361
362const (
363	// transaction is new, waiting to be initialized..
364	txNew txState = iota
365	// transaction is being initialized.
366	txInit
367	// transaction is active and can perform read/write.
368	txActive
369	// transaction is closed, cannot be used anymore.
370	txClosed
371)
372
373// errRtsUnavailable returns error for read transaction's read timestamp being
374// unavailable.
375func errRtsUnavailable() error {
376	return spannerErrorf(codes.Internal, "read timestamp is unavailable")
377}
378
379// errTxClosed returns error for using a closed transaction.
380func errTxClosed() error {
381	return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction")
382}
383
384// errUnexpectedTxState returns error for transaction enters an unexpected state.
385func errUnexpectedTxState(ts txState) error {
386	return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts)
387}
388
389// ReadOnlyTransaction provides a snapshot transaction with guaranteed
390// consistency across reads, but does not allow writes.  Read-only transactions
391// can be configured to read at timestamps in the past.
392//
393// Read-only transactions do not take locks. Instead, they work by choosing a
394// Cloud Spanner timestamp, then executing all reads at that timestamp. Since
395// they do not acquire locks, they do not block concurrent read-write
396// transactions.
397//
398// Unlike locking read-write transactions, read-only transactions never abort.
399// They can fail if the chosen read timestamp is garbage collected; however, the
400// default garbage collection policy is generous enough that most applications
401// do not need to worry about this in practice. See the documentation of
402// TimestampBound for more details.
403//
404// A ReadOnlyTransaction consumes resources on the server until Close is called.
405type ReadOnlyTransaction struct {
406	// mu protects concurrent access to the internal states of ReadOnlyTransaction.
407	mu sync.Mutex
408	// txReadOnly contains methods for performing transactional reads.
409	txReadOnly
410	// singleUse indicates that the transaction can be used for only one read.
411	singleUse bool
412	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
413	// ReadOnlyTransaction.
414	tx transactionID
415	// txReadyOrClosed is for broadcasting that transaction ID has been returned
416	// by Cloud Spanner or that transaction is closed.
417	txReadyOrClosed chan struct{}
418	// state is the current transaction status of the ReadOnly transaction.
419	state txState
420	// rts is the read timestamp returned by transactional reads.
421	rts time.Time
422	// tb is the read staleness bound specification for transactional reads.
423	tb TimestampBound
424}
425
426// errTxInitTimeout returns error for timeout in waiting for initialization of
427// the transaction.
428func errTxInitTimeout() error {
429	return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization")
430}
431
432// getTimestampBound returns the read staleness bound specified for the
433// ReadOnlyTransaction.
434func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound {
435	t.mu.Lock()
436	defer t.mu.Unlock()
437	return t.tb
438}
439
440// begin starts a snapshot read-only Transaction on Cloud Spanner.
441func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
442	var (
443		locked bool
444		tx     transactionID
445		rts    time.Time
446		sh     *sessionHandle
447		err    error
448		res    *sppb.Transaction
449	)
450	defer func() {
451		if !locked {
452			t.mu.Lock()
453			// Not necessary, just to make it clear that t.mu is being held when
454			// locked == true.
455			locked = true
456		}
457		if t.state != txClosed {
458			// Signal other initialization routines.
459			close(t.txReadyOrClosed)
460			t.txReadyOrClosed = make(chan struct{})
461		}
462		t.mu.Unlock()
463		if err != nil && sh != nil {
464			// Got a valid session handle, but failed to initialize transaction=
465			// on Cloud Spanner.
466			if isSessionNotFoundError(err) {
467				sh.destroy()
468			}
469			// If sh.destroy was already executed, this becomes a noop.
470			sh.recycle()
471		}
472	}()
473	// Retry the BeginTransaction call if a 'Session not found' is returned.
474	for {
475		sh, err = t.sp.take(ctx)
476		if err != nil {
477			return err
478		}
479		res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
480			Session: sh.getID(),
481			Options: &sppb.TransactionOptions{
482				Mode: &sppb.TransactionOptions_ReadOnly_{
483					ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
484				},
485			},
486		})
487		if isSessionNotFoundError(err) {
488			sh.destroy()
489			continue
490		} else if err == nil {
491			tx = res.Id
492			if res.ReadTimestamp != nil {
493				rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
494			}
495		} else {
496			err = ToSpannerError(err)
497		}
498		break
499	}
500	t.mu.Lock()
501
502	// defer function will be executed with t.mu being held.
503	locked = true
504
505	// During the execution of t.begin(), t.Close() was invoked.
506	if t.state == txClosed {
507		return errSessionClosed(sh)
508	}
509
510	// If begin() fails, this allows other queries to take over the
511	// initialization.
512	t.tx = nil
513	if err == nil {
514		t.tx = tx
515		t.rts = rts
516		t.sh = sh
517		// State transite to txActive.
518		t.state = txActive
519	}
520	return err
521}
522
523// acquire implements txReadEnv.acquire.
524func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
525	if err := checkNestedTxn(ctx); err != nil {
526		return nil, nil, err
527	}
528	if t.singleUse {
529		return t.acquireSingleUse(ctx)
530	}
531	return t.acquireMultiUse(ctx)
532}
533
534func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
535	t.mu.Lock()
536	defer t.mu.Unlock()
537	switch t.state {
538	case txClosed:
539		// A closed single-use transaction can never be reused.
540		return nil, nil, errTxClosed()
541	case txNew:
542		t.state = txClosed
543		ts := &sppb.TransactionSelector{
544			Selector: &sppb.TransactionSelector_SingleUse{
545				SingleUse: &sppb.TransactionOptions{
546					Mode: &sppb.TransactionOptions_ReadOnly_{
547						ReadOnly: buildTransactionOptionsReadOnly(t.tb, true),
548					},
549				},
550			},
551		}
552		sh, err := t.sp.take(ctx)
553		if err != nil {
554			return nil, nil, err
555		}
556
557		// Install session handle into t, which can be used for readonly
558		// operations later.
559		t.sh = sh
560		return sh, ts, nil
561	}
562	us := t.state
563
564	// SingleUse transaction should only be in either txNew state or txClosed
565	// state.
566	return nil, nil, errUnexpectedTxState(us)
567}
568
569func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
570	for {
571		t.mu.Lock()
572		switch t.state {
573		case txClosed:
574			t.mu.Unlock()
575			return nil, nil, errTxClosed()
576		case txNew:
577			// State transit to txInit so that no further TimestampBound change
578			// is accepted.
579			t.state = txInit
580			t.mu.Unlock()
581			continue
582		case txInit:
583			if t.tx != nil {
584				// Wait for a transaction ID to become ready.
585				txReadyOrClosed := t.txReadyOrClosed
586				t.mu.Unlock()
587				select {
588				case <-txReadyOrClosed:
589					// Need to check transaction state again.
590					continue
591				case <-ctx.Done():
592					// The waiting for initialization is timeout, return error
593					// directly.
594					return nil, nil, errTxInitTimeout()
595				}
596			}
597			// Take the ownership of initializing the transaction.
598			t.tx = transactionID{}
599			t.mu.Unlock()
600			// Begin a read-only transaction.
601			//
602			// TODO: consider adding a transaction option which allow queries to
603			//  initiate transactions by themselves. Note that this option might
604			//  not be always good because the ID of the new transaction won't
605			//  be ready till the query returns some data or completes.
606			if err := t.begin(ctx); err != nil {
607				return nil, nil, err
608			}
609
610			// If t.begin() succeeded, t.state should have been changed to
611			// txActive, so we can just continue here.
612			continue
613		case txActive:
614			sh := t.sh
615			ts := &sppb.TransactionSelector{
616				Selector: &sppb.TransactionSelector_Id{
617					Id: t.tx,
618				},
619			}
620			t.mu.Unlock()
621			return sh, ts, nil
622		}
623		state := t.state
624		t.mu.Unlock()
625		return nil, nil, errUnexpectedTxState(state)
626	}
627}
628
629func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) {
630	t.mu.Lock()
631	defer t.mu.Unlock()
632	if t.rts.IsZero() {
633		t.rts = ts
634	}
635}
636
637// release implements txReadEnv.release.
638func (t *ReadOnlyTransaction) release(err error) {
639	t.mu.Lock()
640	sh := t.sh
641	t.mu.Unlock()
642	if sh != nil { // sh could be nil if t.acquire() fails.
643		if isSessionNotFoundError(err) {
644			sh.destroy()
645		}
646		if t.singleUse {
647			// If session handle is already destroyed, this becomes a noop.
648			sh.recycle()
649		}
650	}
651}
652
653// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads
654// after being closed.
655func (t *ReadOnlyTransaction) Close() {
656	if t.singleUse {
657		return
658	}
659	t.mu.Lock()
660	if t.state != txClosed {
661		t.state = txClosed
662		close(t.txReadyOrClosed)
663	}
664	sh := t.sh
665	t.mu.Unlock()
666	if sh == nil {
667		return
668	}
669	// If session handle is already destroyed, this becomes a noop. If there are
670	// still active queries and if the recycled session is reused before they
671	// complete, Cloud Spanner will cancel them on behalf of the new transaction
672	// on the session.
673	if sh != nil {
674		sh.recycle()
675	}
676}
677
678// Timestamp returns the timestamp chosen to perform reads and queries in this
679// transaction. The value can only be read after some read or query has either
680// returned some data or completed without returning any data.
681func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) {
682	t.mu.Lock()
683	defer t.mu.Unlock()
684	if t.rts.IsZero() {
685		return t.rts, errRtsUnavailable()
686	}
687	return t.rts, nil
688}
689
690// WithTimestampBound specifies the TimestampBound to use for read or query.
691// This can only be used before the first read or query is invoked. Note:
692// bounded staleness is not available with general ReadOnlyTransactions; use a
693// single-use ReadOnlyTransaction instead.
694//
695// The returned value is the ReadOnlyTransaction so calls can be chained.
696func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction {
697	t.mu.Lock()
698	defer t.mu.Unlock()
699	if t.state == txNew {
700		// Only allow to set TimestampBound before the first query.
701		t.tb = tb
702	}
703	return t
704}
705
706// ReadWriteTransaction provides a locking read-write transaction.
707//
708// This type of transaction is the only way to write data into Cloud Spanner;
709// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use
710// transactions internally. These transactions rely on pessimistic locking and,
711// if necessary, two-phase commit. Locking read-write transactions may abort,
712// requiring the application to retry. However, the interface exposed by
713// (*Client).ReadWriteTransaction eliminates the need for applications to write
714// retry loops explicitly.
715//
716// Locking transactions may be used to atomically read-modify-write data
717// anywhere in a database. This type of transaction is externally consistent.
718//
719// Clients should attempt to minimize the amount of time a transaction is
720// active. Faster transactions commit with higher probability and cause less
721// contention. Cloud Spanner attempts to keep read locks active as long as the
722// transaction continues to do reads.  Long periods of inactivity at the client
723// may cause Cloud Spanner to release a transaction's locks and abort it.
724//
725// Reads performed within a transaction acquire locks on the data being
726// read. Writes can only be done at commit time, after all reads have been
727// completed. Conceptually, a read-write transaction consists of zero or more
728// reads or SQL queries followed by a commit.
729//
730// See (*Client).ReadWriteTransaction for an example.
731//
732// Semantics
733//
734// Cloud Spanner can commit the transaction if all read locks it acquired are
735// still valid at commit time, and it is able to acquire write locks for all
736// writes. Cloud Spanner can abort the transaction for any reason. If a commit
737// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has
738// not modified any user data in Cloud Spanner.
739//
740// Unless the transaction commits, Cloud Spanner makes no guarantees about how
741// long the transaction's locks were held for. It is an error to use Cloud
742// Spanner locks for any sort of mutual exclusion other than between Cloud
743// Spanner transactions themselves.
744//
745// Aborted transactions
746//
747// Application code does not need to retry explicitly; RunInTransaction will
748// automatically retry a transaction if an attempt results in an abort. The lock
749// priority of a transaction increases after each prior aborted transaction,
750// meaning that the next attempt has a slightly better chance of success than
751// before.
752//
753// Under some circumstances (e.g., many transactions attempting to modify the
754// same row(s)), a transaction can abort many times in a short period before
755// successfully committing. Thus, it is not a good idea to cap the number of
756// retries a transaction can attempt; instead, it is better to limit the total
757// amount of wall time spent retrying.
758//
759// Idle transactions
760//
761// A transaction is considered idle if it has no outstanding reads or SQL
762// queries and has not started a read or SQL query within the last 10
763// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't
764// hold on to locks indefinitely. In that case, the commit will fail with error
765// ABORTED.
766//
767// If this behavior is undesirable, periodically executing a simple SQL query
768// in the transaction (e.g., SELECT 1) prevents the transaction from becoming
769// idle.
770type ReadWriteTransaction struct {
771	// txReadOnly contains methods for performing transactional reads.
772	txReadOnly
773	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
774	// ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin()
775	// during the initialization of ReadWriteTransaction.
776	tx transactionID
777	// mu protects concurrent access to the internal states of
778	// ReadWriteTransaction.
779	mu sync.Mutex
780	// state is the current transaction status of the read-write transaction.
781	state txState
782	// wb is the set of buffered mutations waiting to be committed.
783	wb []*Mutation
784}
785
786// BufferWrite adds a list of mutations to the set of updates that will be
787// applied when the transaction is committed. It does not actually apply the
788// write until the transaction is committed, so the operation does not block.
789// The effects of the write won't be visible to any reads (including reads done
790// in the same transaction) until the transaction commits.
791//
792// See the example for Client.ReadWriteTransaction.
793func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error {
794	t.mu.Lock()
795	defer t.mu.Unlock()
796	if t.state == txClosed {
797		return errTxClosed()
798	}
799	if t.state != txActive {
800		return errUnexpectedTxState(t.state)
801	}
802	t.wb = append(t.wb, ms...)
803	return nil
804}
805
806// Update executes a DML statement against the database. It returns the number
807// of affected rows. Update returns an error if the statement is a query.
808// However, the query is executed, and any data read will be validated upon
809// commit.
810func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) {
811	mode := sppb.ExecuteSqlRequest_NORMAL
812	return t.update(ctx, stmt, QueryOptions{
813		Mode:    &mode,
814		Options: t.qo.Options,
815	})
816}
817
818// UpdateWithOptions executes a DML statement against the database. It returns
819// the number of affected rows. The sql query execution will be optimized
820// based on the given query options.
821func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) {
822	return t.update(ctx, stmt, t.qo.merge(opts))
823}
824
825func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) {
826	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update")
827	defer func() { trace.EndSpan(ctx, err) }()
828	req, sh, err := t.prepareExecuteSQL(ctx, stmt, opts)
829	if err != nil {
830		return 0, err
831	}
832	resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req)
833	if err != nil {
834		return 0, ToSpannerError(err)
835	}
836	if resultSet.Stats == nil {
837		return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL)
838	}
839	return extractRowCount(resultSet.Stats)
840}
841
842// BatchUpdate groups one or more DML statements and sends them to Spanner in a
843// single RPC. This is an efficient way to execute multiple DML statements.
844//
845// A slice of counts is returned, where each count represents the number of
846// affected rows for the given query at the same index. If an error occurs,
847// counts will be returned up to the query that encountered the error.
848func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) {
849	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate")
850	defer func() { trace.EndSpan(ctx, err) }()
851
852	sh, ts, err := t.acquire(ctx)
853	if err != nil {
854		return nil, err
855	}
856	// Cloud Spanner will return "Session not found" on bad sessions.
857	sid := sh.getID()
858	if sid == "" {
859		// Might happen if transaction is closed in the middle of a API call.
860		return nil, errSessionClosed(sh)
861	}
862
863	var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement
864	for _, st := range stmts {
865		params, paramTypes, err := st.convertParams()
866		if err != nil {
867			return nil, err
868		}
869		sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{
870			Sql:        st.SQL,
871			Params:     params,
872			ParamTypes: paramTypes,
873		})
874	}
875
876	resp, err := sh.getClient().ExecuteBatchDml(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.ExecuteBatchDmlRequest{
877		Session:     sh.getID(),
878		Transaction: ts,
879		Statements:  sppbStmts,
880		Seqno:       atomic.AddInt64(&t.sequenceNumber, 1),
881	})
882	if err != nil {
883		return nil, ToSpannerError(err)
884	}
885
886	var counts []int64
887	for _, rs := range resp.ResultSets {
888		count, err := extractRowCount(rs.Stats)
889		if err != nil {
890			return nil, err
891		}
892		counts = append(counts, count)
893	}
894	if resp.Status != nil && resp.Status.Code != 0 {
895		return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message)
896	}
897	return counts, nil
898}
899
900// acquire implements txReadEnv.acquire.
901func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
902	ts := &sppb.TransactionSelector{
903		Selector: &sppb.TransactionSelector_Id{
904			Id: t.tx,
905		},
906	}
907	t.mu.Lock()
908	defer t.mu.Unlock()
909	switch t.state {
910	case txClosed:
911		return nil, nil, errTxClosed()
912	case txActive:
913		return t.sh, ts, nil
914	}
915	return nil, nil, errUnexpectedTxState(t.state)
916}
917
918// release implements txReadEnv.release.
919func (t *ReadWriteTransaction) release(err error) {
920	t.mu.Lock()
921	sh := t.sh
922	t.mu.Unlock()
923	if sh != nil && isSessionNotFoundError(err) {
924		sh.destroy()
925	}
926}
927
928func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) {
929	res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
930		Session: sid,
931		Options: &sppb.TransactionOptions{
932			Mode: &sppb.TransactionOptions_ReadWrite_{
933				ReadWrite: &sppb.TransactionOptions_ReadWrite{},
934			},
935		},
936	})
937	if err != nil {
938		return nil, err
939	}
940	if res.Id == nil {
941		return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.")
942	}
943	return res.Id, nil
944}
945
946// begin starts a read-write transacton on Cloud Spanner, it is always called
947// before any of the public APIs.
948func (t *ReadWriteTransaction) begin(ctx context.Context) error {
949	if t.tx != nil {
950		t.state = txActive
951		return nil
952	}
953	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
954	if err == nil {
955		t.tx = tx
956		t.state = txActive
957		return nil
958	}
959	if isSessionNotFoundError(err) {
960		t.sh.destroy()
961	}
962	return err
963}
964
965// CommitResponse provides a response of a transaction commit in a database.
966type CommitResponse struct {
967	// CommitTs is the commit time for a transaction.
968	CommitTs time.Time
969	// CommitStats is the commit statistics for a transaction.
970	CommitStats *sppb.CommitResponse_CommitStats
971}
972
973// CommitOptions provides options for commiting a transaction in a database.
974type CommitOptions struct {
975	ReturnCommitStats bool
976}
977
978// commit tries to commit a readwrite transaction to Cloud Spanner. It also
979// returns the commit response for the transactions.
980func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions) (CommitResponse, error) {
981	resp := CommitResponse{}
982	t.mu.Lock()
983	t.state = txClosed // No further operations after commit.
984	mPb, err := mutationsProto(t.wb)
985	t.mu.Unlock()
986	if err != nil {
987		return resp, err
988	}
989	// In case that sessionHandle was destroyed but transaction body fails to
990	// report it.
991	sid, client := t.sh.getID(), t.sh.getClient()
992	if sid == "" || client == nil {
993		return resp, errSessionClosed(t.sh)
994	}
995
996	res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{
997		Session: sid,
998		Transaction: &sppb.CommitRequest_TransactionId{
999			TransactionId: t.tx,
1000		},
1001		Mutations:         mPb,
1002		ReturnCommitStats: options.ReturnCommitStats,
1003	})
1004	if e != nil {
1005		return resp, toSpannerErrorWithCommitInfo(e, true)
1006	}
1007	if tstamp := res.GetCommitTimestamp(); tstamp != nil {
1008		resp.CommitTs = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
1009	}
1010	if options.ReturnCommitStats {
1011		resp.CommitStats = res.CommitStats
1012	}
1013	if isSessionNotFoundError(err) {
1014		t.sh.destroy()
1015	}
1016	return resp, err
1017}
1018
1019// rollback is called when a commit is aborted or the transaction body runs
1020// into error.
1021func (t *ReadWriteTransaction) rollback(ctx context.Context) {
1022	t.mu.Lock()
1023	// Forbid further operations on rollbacked transaction.
1024	t.state = txClosed
1025	t.mu.Unlock()
1026	// In case that sessionHandle was destroyed but transaction body fails to
1027	// report it.
1028	sid, client := t.sh.getID(), t.sh.getClient()
1029	if sid == "" || client == nil {
1030		return
1031	}
1032	err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{
1033		Session:       sid,
1034		TransactionId: t.tx,
1035	})
1036	if isSessionNotFoundError(err) {
1037		t.sh.destroy()
1038	}
1039}
1040
1041// runInTransaction executes f under a read-write transaction context.
1042func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (CommitResponse, error) {
1043	var (
1044		resp            CommitResponse
1045		err             error
1046		errDuringCommit bool
1047	)
1048	if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
1049		// Try to commit if transaction body returns no error.
1050		resp, err = t.commit(ctx, t.txOpts.CommitOptions)
1051		errDuringCommit = err != nil
1052	}
1053	if err != nil {
1054		if isAbortedErr(err) {
1055			// Retry the transaction using the same session on ABORT error.
1056			// Cloud Spanner will create the new transaction with the previous
1057			// one's wound-wait priority.
1058			return resp, err
1059		}
1060		if isSessionNotFoundError(err) {
1061			t.sh.destroy()
1062			return resp, err
1063		}
1064		// Rollback the transaction unless the error occurred during the
1065		// commit. Executing a rollback after a commit has failed will
1066		// otherwise cause an error. Note that transient errors, such as
1067		// UNAVAILABLE, are already handled in the gRPC layer and do not show
1068		// up here. Context errors (deadline exceeded / canceled) during
1069		// commits are also not rolled back.
1070		if !errDuringCommit {
1071			t.rollback(ctx)
1072		}
1073		return resp, err
1074	}
1075	// err == nil, return commit response.
1076	return resp, nil
1077}
1078
1079// ReadWriteStmtBasedTransaction provides a wrapper of ReadWriteTransaction in
1080// order to run a read-write transaction in a statement-based way.
1081//
1082// This struct is returned by NewReadWriteStmtBasedTransaction and contains
1083// Commit() and Rollback() methods to end a transaction.
1084type ReadWriteStmtBasedTransaction struct {
1085	// ReadWriteTransaction contains methods for performing transactional reads.
1086	ReadWriteTransaction
1087
1088	options TransactionOptions
1089}
1090
1091// NewReadWriteStmtBasedTransaction starts a read-write transaction. Commit() or
1092// Rollback() must be called to end a transaction. If Commit() or Rollback() is
1093// not called, the session that is used by the transaction will not be returned
1094// to the pool and cause a session leak.
1095//
1096// This method should only be used when manual error handling and retry
1097// management is needed. Cloud Spanner may abort a read/write transaction at any
1098// moment, and each statement that is executed on the transaction should be
1099// checked for an Aborted error, including queries and read operations.
1100//
1101// For most use cases, client.ReadWriteTransaction should be used, as it will
1102// handle all Aborted and 'Session not found' errors automatically.
1103func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWriteStmtBasedTransaction, error) {
1104	return NewReadWriteStmtBasedTransactionWithOptions(ctx, c, TransactionOptions{})
1105}
1106
1107// NewReadWriteStmtBasedTransactionWithOptions starts a read-write transaction
1108// with configurable options. Commit() or Rollback() must be called to end a
1109// transaction. If Commit() or Rollback() is not called, the session that is
1110// used by the transaction will not be returned to the pool and cause a session
1111// leak.
1112//
1113// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of
1114// NewReadWriteStmtBasedTransaction.
1115func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
1116	var (
1117		sh  *sessionHandle
1118		err error
1119		t   *ReadWriteStmtBasedTransaction
1120	)
1121	sh, err = c.idleSessions.takeWriteSession(ctx)
1122	if err != nil {
1123		// If session retrieval fails, just fail the transaction.
1124		return nil, err
1125	}
1126	t = &ReadWriteStmtBasedTransaction{
1127		ReadWriteTransaction: ReadWriteTransaction{
1128			tx: sh.getTransactionID(),
1129		},
1130	}
1131	t.txReadOnly.sh = sh
1132	t.txReadOnly.txReadEnv = t
1133	t.txReadOnly.qo = c.qo
1134	t.txOpts = options
1135
1136	if err = t.begin(ctx); err != nil {
1137		if sh != nil {
1138			sh.recycle()
1139		}
1140		return nil, err
1141	}
1142	return t, err
1143}
1144
1145// Commit tries to commit a readwrite transaction to Cloud Spanner. It also
1146// returns the commit timestamp for the transactions.
1147func (t *ReadWriteStmtBasedTransaction) Commit(ctx context.Context) (time.Time, error) {
1148	resp, err := t.CommitWithReturnResp(ctx)
1149	return resp.CommitTs, err
1150}
1151
1152// CommitWithReturnResp tries to commit a readwrite transaction. It also returns
1153// the commit timestamp and stats for the transactions.
1154func (t *ReadWriteStmtBasedTransaction) CommitWithReturnResp(ctx context.Context) (CommitResponse, error) {
1155	resp, err := t.commit(ctx, t.txOpts.CommitOptions)
1156	// Rolling back an aborted transaction is not necessary.
1157	if err != nil && status.Code(err) != codes.Aborted {
1158		t.rollback(ctx)
1159	}
1160	if t.sh != nil {
1161		t.sh.recycle()
1162	}
1163	return resp, err
1164}
1165
1166// Rollback is called to cancel the ongoing transaction that has not been
1167// committed yet.
1168func (t *ReadWriteStmtBasedTransaction) Rollback(ctx context.Context) {
1169	t.rollback(ctx)
1170	if t.sh != nil {
1171		t.sh.recycle()
1172	}
1173}
1174
1175// writeOnlyTransaction provides the most efficient way of doing write-only
1176// transactions. It essentially does blind writes to Cloud Spanner.
1177type writeOnlyTransaction struct {
1178	// sp is the session pool which writeOnlyTransaction uses to get Cloud
1179	// Spanner sessions for blind writes.
1180	sp *sessionPool
1181}
1182
1183// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once,
1184// unless one of the following happens:
1185//
1186//     1) Context times out.
1187//     2) An unretryable error (e.g. database not found) occurs.
1188//     3) There is a malformed Mutation object.
1189func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) {
1190	var (
1191		ts time.Time
1192		sh *sessionHandle
1193	)
1194	mPb, err := mutationsProto(ms)
1195	if err != nil {
1196		// Malformed mutation found, just return the error.
1197		return ts, err
1198	}
1199
1200	// Retry-loop for aborted transactions.
1201	// TODO: Replace with generic retryer.
1202	for {
1203		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
1204			// No usable session for doing the commit, take one from pool.
1205			sh, err = t.sp.take(ctx)
1206			if err != nil {
1207				// sessionPool.Take already retries for session
1208				// creations/retrivals.
1209				return ts, err
1210			}
1211			defer sh.recycle()
1212		}
1213		res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
1214			Session: sh.getID(),
1215			Transaction: &sppb.CommitRequest_SingleUseTransaction{
1216				SingleUseTransaction: &sppb.TransactionOptions{
1217					Mode: &sppb.TransactionOptions_ReadWrite_{
1218						ReadWrite: &sppb.TransactionOptions_ReadWrite{},
1219					},
1220				},
1221			},
1222			Mutations: mPb,
1223		})
1224		if err != nil && !isAbortedErr(err) {
1225			if isSessionNotFoundError(err) {
1226				// Discard the bad session.
1227				sh.destroy()
1228			}
1229			return ts, toSpannerErrorWithCommitInfo(err, true)
1230		} else if err == nil {
1231			if tstamp := res.GetCommitTimestamp(); tstamp != nil {
1232				ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
1233			}
1234			break
1235		}
1236	}
1237	return ts, ToSpannerError(err)
1238}
1239
1240// isAbortedErr returns true if the error indicates that an gRPC call is
1241// aborted on the server side.
1242func isAbortedErr(err error) bool {
1243	if err == nil {
1244		return false
1245	}
1246	if ErrCode(err) == codes.Aborted {
1247		return true
1248	}
1249	return false
1250}
1251