1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	"cloud.google.com/go/internal/trace"
26	vkit "cloud.google.com/go/spanner/apiv1"
27	"github.com/golang/protobuf/proto"
28	"google.golang.org/api/iterator"
29	sppb "google.golang.org/genproto/googleapis/spanner/v1"
30	"google.golang.org/grpc/codes"
31	"google.golang.org/grpc/status"
32)
33
34// transactionID stores a transaction ID which uniquely identifies a transaction
35// in Cloud Spanner.
36type transactionID []byte
37
38// txReadEnv manages a read-transaction environment consisting of a session
39// handle and a transaction selector.
40type txReadEnv interface {
41	// acquire returns a read-transaction environment that can be used to
42	// perform a transactional read.
43	acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error)
44	// sets the transaction's read timestamp
45	setTimestamp(time.Time)
46	// release should be called at the end of every transactional read to deal
47	// with session recycling.
48	release(error)
49}
50
51// txReadOnly contains methods for doing transactional reads.
52type txReadOnly struct {
53	// read-transaction environment for performing transactional read
54	// operations.
55	txReadEnv
56
57	// Atomic. Only needed for DML statements, but used forall.
58	sequenceNumber int64
59
60	// replaceSessionFunc is a function that can be called to replace the
61	// session that is used by the transaction. This function should only be
62	// defined for single-use transactions that can safely be retried on a
63	// different session. All other transactions will set this function to nil.
64	replaceSessionFunc func(ctx context.Context) error
65
66	// sp is the session pool for allocating a session to execute the read-only
67	// transaction. It is set only once during initialization of the
68	// txReadOnly.
69	sp *sessionPool
70	// sh is the sessionHandle allocated from sp.
71	sh *sessionHandle
72
73	// qo provides options for executing a sql query.
74	qo QueryOptions
75}
76
77// errSessionClosed returns error for using a recycled/destroyed session
78func errSessionClosed(sh *sessionHandle) error {
79	return spannerErrorf(codes.FailedPrecondition,
80		"session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient())
81}
82
83// Read returns a RowIterator for reading multiple rows from the database.
84func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator {
85	return t.ReadWithOptions(ctx, table, keys, columns, nil)
86}
87
88// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}.
89func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) {
90	return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index})
91}
92
93// ReadOptions provides options for reading rows from a database.
94type ReadOptions struct {
95	// The index to use for reading. If non-empty, you can only read columns
96	// that are part of the index key, part of the primary key, or stored in the
97	// index due to a STORING clause in the index definition.
98	Index string
99
100	// The maximum number of rows to read. A limit value less than 1 means no
101	// limit.
102	Limit int
103}
104
105// ReadWithOptions returns a RowIterator for reading multiple rows from the
106// database. Pass a ReadOptions to modify the read operation.
107func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) {
108	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read")
109	defer func() { trace.EndSpan(ctx, ri.err) }()
110	var (
111		sh  *sessionHandle
112		ts  *sppb.TransactionSelector
113		err error
114	)
115	kset, err := keys.keySetProto()
116	if err != nil {
117		return &RowIterator{err: err}
118	}
119	if sh, ts, err = t.acquire(ctx); err != nil {
120		return &RowIterator{err: err}
121	}
122	// Cloud Spanner will return "Session not found" on bad sessions.
123	client := sh.getClient()
124	if client == nil {
125		// Might happen if transaction is closed in the middle of a API call.
126		return &RowIterator{err: errSessionClosed(sh)}
127	}
128	index := ""
129	limit := 0
130	if opts != nil {
131		index = opts.Index
132		if opts.Limit > 0 {
133			limit = opts.Limit
134		}
135	}
136	return streamWithReplaceSessionFunc(
137		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
138		sh.session.logger,
139		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
140			return client.StreamingRead(ctx,
141				&sppb.ReadRequest{
142					Session:     t.sh.getID(),
143					Transaction: ts,
144					Table:       table,
145					Index:       index,
146					Columns:     columns,
147					KeySet:      kset,
148					ResumeToken: resumeToken,
149					Limit:       int64(limit),
150				})
151		},
152		t.replaceSessionFunc,
153		t.setTimestamp,
154		t.release,
155	)
156}
157
158// errRowNotFound returns error for not being able to read the row identified by
159// key.
160func errRowNotFound(table string, key Key) error {
161	return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key)
162}
163
164// errRowNotFoundByIndex returns error for not being able to read the row by index.
165func errRowNotFoundByIndex(table string, key Key, index string) error {
166	return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
167}
168
169// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index.
170func errMultipleRowsFound(table string, key Key, index string) error {
171	return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
172}
173
174// ReadRow reads a single row from the database.
175//
176// If no row is present with the given key, then ReadRow returns an error where
177// spanner.ErrCode(err) is codes.NotFound.
178func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) {
179	iter := t.Read(ctx, table, key, columns)
180	defer iter.Stop()
181	row, err := iter.Next()
182	switch err {
183	case iterator.Done:
184		return nil, errRowNotFound(table, key)
185	case nil:
186		return row, nil
187	default:
188		return nil, err
189	}
190}
191
192// ReadRowUsingIndex reads a single row from the database using an index.
193//
194// If no row is present with the given index, then ReadRowUsingIndex returns an
195// error where spanner.ErrCode(err) is codes.NotFound.
196//
197// If more than one row received with the given index, then ReadRowUsingIndex
198// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition.
199func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) {
200	iter := t.ReadUsingIndex(ctx, table, index, key, columns)
201	defer iter.Stop()
202	row, err := iter.Next()
203	switch err {
204	case iterator.Done:
205		return nil, errRowNotFoundByIndex(table, key, index)
206	case nil:
207		// If more than one row found, return an error.
208		_, err := iter.Next()
209		switch err {
210		case iterator.Done:
211			return row, nil
212		case nil:
213			return nil, errMultipleRowsFound(table, key, index)
214		default:
215			return nil, err
216		}
217	default:
218		return nil, err
219	}
220}
221
222// QueryOptions provides options for executing a sql query from a database.
223type QueryOptions struct {
224	Mode    *sppb.ExecuteSqlRequest_QueryMode
225	Options *sppb.ExecuteSqlRequest_QueryOptions
226}
227
228// merge combines two QueryOptions that the input parameter will have higher
229// order of precedence.
230func (qo QueryOptions) merge(opts QueryOptions) QueryOptions {
231	merged := QueryOptions{
232		Mode:    qo.Mode,
233		Options: &sppb.ExecuteSqlRequest_QueryOptions{},
234	}
235	if opts.Mode != nil {
236		merged.Mode = opts.Mode
237	}
238	proto.Merge(merged.Options, qo.Options)
239	proto.Merge(merged.Options, opts.Options)
240	return merged
241}
242
243// Query executes a query against the database. It returns a RowIterator for
244// retrieving the resulting rows.
245//
246// Query returns only row data, without a query plan or execution statistics.
247// Use QueryWithStats to get rows along with the plan and statistics. Use
248// AnalyzeQuery to get just the plan.
249func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator {
250	mode := sppb.ExecuteSqlRequest_NORMAL
251	return t.query(ctx, statement, QueryOptions{
252		Mode:    &mode,
253		Options: t.qo.Options,
254	})
255}
256
257// QueryWithOptions executes a SQL statment against the database. It returns
258// a RowIterator for retrieving the resulting rows. The sql query execution
259// will be optimized based on the given query options.
260func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, opts QueryOptions) *RowIterator {
261	return t.query(ctx, statement, t.qo.merge(opts))
262}
263
264// QueryWithStats executes a SQL statement against the database. It returns
265// a RowIterator for retrieving the resulting rows. The RowIterator will also
266// be populated with a query plan and execution statistics.
267func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator {
268	mode := sppb.ExecuteSqlRequest_PROFILE
269	return t.query(ctx, statement, QueryOptions{
270		Mode:    &mode,
271		Options: t.qo.Options,
272	})
273}
274
275// AnalyzeQuery returns the query plan for statement.
276func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) {
277	mode := sppb.ExecuteSqlRequest_PLAN
278	iter := t.query(ctx, statement, QueryOptions{
279		Mode:    &mode,
280		Options: t.qo.Options,
281	})
282	defer iter.Stop()
283	for {
284		_, err := iter.Next()
285		if err == iterator.Done {
286			break
287		}
288		if err != nil {
289			return nil, err
290		}
291	}
292	if iter.QueryPlan == nil {
293		return nil, spannerErrorf(codes.Internal, "query plan unavailable")
294	}
295	return iter.QueryPlan, nil
296}
297
298func (t *txReadOnly) query(ctx context.Context, statement Statement, options QueryOptions) (ri *RowIterator) {
299	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query")
300	defer func() { trace.EndSpan(ctx, ri.err) }()
301	req, sh, err := t.prepareExecuteSQL(ctx, statement, options)
302	if err != nil {
303		return &RowIterator{err: err}
304	}
305	client := sh.getClient()
306	return streamWithReplaceSessionFunc(
307		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
308		sh.session.logger,
309		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
310			req.ResumeToken = resumeToken
311			req.Session = t.sh.getID()
312			return client.ExecuteStreamingSql(ctx, req)
313		},
314		t.replaceSessionFunc,
315		t.setTimestamp,
316		t.release)
317}
318
319func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, options QueryOptions) (*sppb.ExecuteSqlRequest, *sessionHandle, error) {
320	sh, ts, err := t.acquire(ctx)
321	if err != nil {
322		return nil, nil, err
323	}
324	// Cloud Spanner will return "Session not found" on bad sessions.
325	sid := sh.getID()
326	if sid == "" {
327		// Might happen if transaction is closed in the middle of a API call.
328		return nil, nil, errSessionClosed(sh)
329	}
330	params, paramTypes, err := stmt.convertParams()
331	if err != nil {
332		return nil, nil, err
333	}
334	mode := sppb.ExecuteSqlRequest_NORMAL
335	if options.Mode != nil {
336		mode = *options.Mode
337	}
338	req := &sppb.ExecuteSqlRequest{
339		Session:      sid,
340		Transaction:  ts,
341		Sql:          stmt.SQL,
342		QueryMode:    mode,
343		Seqno:        atomic.AddInt64(&t.sequenceNumber, 1),
344		Params:       params,
345		ParamTypes:   paramTypes,
346		QueryOptions: options.Options,
347	}
348	return req, sh, nil
349}
350
351// txState is the status of a transaction.
352type txState int
353
354const (
355	// transaction is new, waiting to be initialized..
356	txNew txState = iota
357	// transaction is being initialized.
358	txInit
359	// transaction is active and can perform read/write.
360	txActive
361	// transaction is closed, cannot be used anymore.
362	txClosed
363)
364
365// errRtsUnavailable returns error for read transaction's read timestamp being
366// unavailable.
367func errRtsUnavailable() error {
368	return spannerErrorf(codes.Internal, "read timestamp is unavailable")
369}
370
371// errTxClosed returns error for using a closed transaction.
372func errTxClosed() error {
373	return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction")
374}
375
376// errUnexpectedTxState returns error for transaction enters an unexpected state.
377func errUnexpectedTxState(ts txState) error {
378	return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts)
379}
380
381// ReadOnlyTransaction provides a snapshot transaction with guaranteed
382// consistency across reads, but does not allow writes.  Read-only transactions
383// can be configured to read at timestamps in the past.
384//
385// Read-only transactions do not take locks. Instead, they work by choosing a
386// Cloud Spanner timestamp, then executing all reads at that timestamp. Since
387// they do not acquire locks, they do not block concurrent read-write
388// transactions.
389//
390// Unlike locking read-write transactions, read-only transactions never abort.
391// They can fail if the chosen read timestamp is garbage collected; however, the
392// default garbage collection policy is generous enough that most applications
393// do not need to worry about this in practice. See the documentation of
394// TimestampBound for more details.
395//
396// A ReadOnlyTransaction consumes resources on the server until Close is called.
397type ReadOnlyTransaction struct {
398	// mu protects concurrent access to the internal states of ReadOnlyTransaction.
399	mu sync.Mutex
400	// txReadOnly contains methods for performing transactional reads.
401	txReadOnly
402	// singleUse indicates that the transaction can be used for only one read.
403	singleUse bool
404	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
405	// ReadOnlyTransaction.
406	tx transactionID
407	// txReadyOrClosed is for broadcasting that transaction ID has been returned
408	// by Cloud Spanner or that transaction is closed.
409	txReadyOrClosed chan struct{}
410	// state is the current transaction status of the ReadOnly transaction.
411	state txState
412	// rts is the read timestamp returned by transactional reads.
413	rts time.Time
414	// tb is the read staleness bound specification for transactional reads.
415	tb TimestampBound
416}
417
418// errTxInitTimeout returns error for timeout in waiting for initialization of
419// the transaction.
420func errTxInitTimeout() error {
421	return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization")
422}
423
424// getTimestampBound returns the read staleness bound specified for the
425// ReadOnlyTransaction.
426func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound {
427	t.mu.Lock()
428	defer t.mu.Unlock()
429	return t.tb
430}
431
432// begin starts a snapshot read-only Transaction on Cloud Spanner.
433func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
434	var (
435		locked bool
436		tx     transactionID
437		rts    time.Time
438		sh     *sessionHandle
439		err    error
440		res    *sppb.Transaction
441	)
442	defer func() {
443		if !locked {
444			t.mu.Lock()
445			// Not necessary, just to make it clear that t.mu is being held when
446			// locked == true.
447			locked = true
448		}
449		if t.state != txClosed {
450			// Signal other initialization routines.
451			close(t.txReadyOrClosed)
452			t.txReadyOrClosed = make(chan struct{})
453		}
454		t.mu.Unlock()
455		if err != nil && sh != nil {
456			// Got a valid session handle, but failed to initialize transaction=
457			// on Cloud Spanner.
458			if isSessionNotFoundError(err) {
459				sh.destroy()
460			}
461			// If sh.destroy was already executed, this becomes a noop.
462			sh.recycle()
463		}
464	}()
465	// Retry the BeginTransaction call if a 'Session not found' is returned.
466	for {
467		sh, err = t.sp.take(ctx)
468		if err != nil {
469			return err
470		}
471		res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
472			Session: sh.getID(),
473			Options: &sppb.TransactionOptions{
474				Mode: &sppb.TransactionOptions_ReadOnly_{
475					ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
476				},
477			},
478		})
479		if isSessionNotFoundError(err) {
480			sh.destroy()
481			continue
482		} else if err == nil {
483			tx = res.Id
484			if res.ReadTimestamp != nil {
485				rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
486			}
487		} else {
488			err = toSpannerError(err)
489		}
490		break
491	}
492	t.mu.Lock()
493
494	// defer function will be executed with t.mu being held.
495	locked = true
496
497	// During the execution of t.begin(), t.Close() was invoked.
498	if t.state == txClosed {
499		return errSessionClosed(sh)
500	}
501
502	// If begin() fails, this allows other queries to take over the
503	// initialization.
504	t.tx = nil
505	if err == nil {
506		t.tx = tx
507		t.rts = rts
508		t.sh = sh
509		// State transite to txActive.
510		t.state = txActive
511	}
512	return err
513}
514
515// acquire implements txReadEnv.acquire.
516func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
517	if err := checkNestedTxn(ctx); err != nil {
518		return nil, nil, err
519	}
520	if t.singleUse {
521		return t.acquireSingleUse(ctx)
522	}
523	return t.acquireMultiUse(ctx)
524}
525
526func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
527	t.mu.Lock()
528	defer t.mu.Unlock()
529	switch t.state {
530	case txClosed:
531		// A closed single-use transaction can never be reused.
532		return nil, nil, errTxClosed()
533	case txNew:
534		t.state = txClosed
535		ts := &sppb.TransactionSelector{
536			Selector: &sppb.TransactionSelector_SingleUse{
537				SingleUse: &sppb.TransactionOptions{
538					Mode: &sppb.TransactionOptions_ReadOnly_{
539						ReadOnly: buildTransactionOptionsReadOnly(t.tb, true),
540					},
541				},
542			},
543		}
544		sh, err := t.sp.take(ctx)
545		if err != nil {
546			return nil, nil, err
547		}
548
549		// Install session handle into t, which can be used for readonly
550		// operations later.
551		t.sh = sh
552		return sh, ts, nil
553	}
554	us := t.state
555
556	// SingleUse transaction should only be in either txNew state or txClosed
557	// state.
558	return nil, nil, errUnexpectedTxState(us)
559}
560
561func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
562	for {
563		t.mu.Lock()
564		switch t.state {
565		case txClosed:
566			t.mu.Unlock()
567			return nil, nil, errTxClosed()
568		case txNew:
569			// State transit to txInit so that no further TimestampBound change
570			// is accepted.
571			t.state = txInit
572			t.mu.Unlock()
573			continue
574		case txInit:
575			if t.tx != nil {
576				// Wait for a transaction ID to become ready.
577				txReadyOrClosed := t.txReadyOrClosed
578				t.mu.Unlock()
579				select {
580				case <-txReadyOrClosed:
581					// Need to check transaction state again.
582					continue
583				case <-ctx.Done():
584					// The waiting for initialization is timeout, return error
585					// directly.
586					return nil, nil, errTxInitTimeout()
587				}
588			}
589			// Take the ownership of initializing the transaction.
590			t.tx = transactionID{}
591			t.mu.Unlock()
592			// Begin a read-only transaction.
593			//
594			// TODO: consider adding a transaction option which allow queries to
595			//  initiate transactions by themselves. Note that this option might
596			//  not be always good because the ID of the new transaction won't
597			//  be ready till the query returns some data or completes.
598			if err := t.begin(ctx); err != nil {
599				return nil, nil, err
600			}
601
602			// If t.begin() succeeded, t.state should have been changed to
603			// txActive, so we can just continue here.
604			continue
605		case txActive:
606			sh := t.sh
607			ts := &sppb.TransactionSelector{
608				Selector: &sppb.TransactionSelector_Id{
609					Id: t.tx,
610				},
611			}
612			t.mu.Unlock()
613			return sh, ts, nil
614		}
615		state := t.state
616		t.mu.Unlock()
617		return nil, nil, errUnexpectedTxState(state)
618	}
619}
620
621func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) {
622	t.mu.Lock()
623	defer t.mu.Unlock()
624	if t.rts.IsZero() {
625		t.rts = ts
626	}
627}
628
629// release implements txReadEnv.release.
630func (t *ReadOnlyTransaction) release(err error) {
631	t.mu.Lock()
632	sh := t.sh
633	t.mu.Unlock()
634	if sh != nil { // sh could be nil if t.acquire() fails.
635		if isSessionNotFoundError(err) {
636			sh.destroy()
637		}
638		if t.singleUse {
639			// If session handle is already destroyed, this becomes a noop.
640			sh.recycle()
641		}
642	}
643}
644
645// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads
646// after being closed.
647func (t *ReadOnlyTransaction) Close() {
648	if t.singleUse {
649		return
650	}
651	t.mu.Lock()
652	if t.state != txClosed {
653		t.state = txClosed
654		close(t.txReadyOrClosed)
655	}
656	sh := t.sh
657	t.mu.Unlock()
658	if sh == nil {
659		return
660	}
661	// If session handle is already destroyed, this becomes a noop. If there are
662	// still active queries and if the recycled session is reused before they
663	// complete, Cloud Spanner will cancel them on behalf of the new transaction
664	// on the session.
665	if sh != nil {
666		sh.recycle()
667	}
668}
669
670// Timestamp returns the timestamp chosen to perform reads and queries in this
671// transaction. The value can only be read after some read or query has either
672// returned some data or completed without returning any data.
673func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) {
674	t.mu.Lock()
675	defer t.mu.Unlock()
676	if t.rts.IsZero() {
677		return t.rts, errRtsUnavailable()
678	}
679	return t.rts, nil
680}
681
682// WithTimestampBound specifies the TimestampBound to use for read or query.
683// This can only be used before the first read or query is invoked. Note:
684// bounded staleness is not available with general ReadOnlyTransactions; use a
685// single-use ReadOnlyTransaction instead.
686//
687// The returned value is the ReadOnlyTransaction so calls can be chained.
688func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction {
689	t.mu.Lock()
690	defer t.mu.Unlock()
691	if t.state == txNew {
692		// Only allow to set TimestampBound before the first query.
693		t.tb = tb
694	}
695	return t
696}
697
698// ReadWriteTransaction provides a locking read-write transaction.
699//
700// This type of transaction is the only way to write data into Cloud Spanner;
701// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use
702// transactions internally. These transactions rely on pessimistic locking and,
703// if necessary, two-phase commit. Locking read-write transactions may abort,
704// requiring the application to retry. However, the interface exposed by
705// (*Client).ReadWriteTransaction eliminates the need for applications to write
706// retry loops explicitly.
707//
708// Locking transactions may be used to atomically read-modify-write data
709// anywhere in a database. This type of transaction is externally consistent.
710//
711// Clients should attempt to minimize the amount of time a transaction is
712// active. Faster transactions commit with higher probability and cause less
713// contention. Cloud Spanner attempts to keep read locks active as long as the
714// transaction continues to do reads.  Long periods of inactivity at the client
715// may cause Cloud Spanner to release a transaction's locks and abort it.
716//
717// Reads performed within a transaction acquire locks on the data being
718// read. Writes can only be done at commit time, after all reads have been
719// completed. Conceptually, a read-write transaction consists of zero or more
720// reads or SQL queries followed by a commit.
721//
722// See (*Client).ReadWriteTransaction for an example.
723//
724// Semantics
725//
726// Cloud Spanner can commit the transaction if all read locks it acquired are
727// still valid at commit time, and it is able to acquire write locks for all
728// writes. Cloud Spanner can abort the transaction for any reason. If a commit
729// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has
730// not modified any user data in Cloud Spanner.
731//
732// Unless the transaction commits, Cloud Spanner makes no guarantees about how
733// long the transaction's locks were held for. It is an error to use Cloud
734// Spanner locks for any sort of mutual exclusion other than between Cloud
735// Spanner transactions themselves.
736//
737// Aborted transactions
738//
739// Application code does not need to retry explicitly; RunInTransaction will
740// automatically retry a transaction if an attempt results in an abort. The lock
741// priority of a transaction increases after each prior aborted transaction,
742// meaning that the next attempt has a slightly better chance of success than
743// before.
744//
745// Under some circumstances (e.g., many transactions attempting to modify the
746// same row(s)), a transaction can abort many times in a short period before
747// successfully committing. Thus, it is not a good idea to cap the number of
748// retries a transaction can attempt; instead, it is better to limit the total
749// amount of wall time spent retrying.
750//
751// Idle transactions
752//
753// A transaction is considered idle if it has no outstanding reads or SQL
754// queries and has not started a read or SQL query within the last 10
755// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't
756// hold on to locks indefinitely. In that case, the commit will fail with error
757// ABORTED.
758//
759// If this behavior is undesirable, periodically executing a simple SQL query
760// in the transaction (e.g., SELECT 1) prevents the transaction from becoming
761// idle.
762type ReadWriteTransaction struct {
763	// txReadOnly contains methods for performing transactional reads.
764	txReadOnly
765	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
766	// ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin()
767	// during the initialization of ReadWriteTransaction.
768	tx transactionID
769	// mu protects concurrent access to the internal states of
770	// ReadWriteTransaction.
771	mu sync.Mutex
772	// state is the current transaction status of the read-write transaction.
773	state txState
774	// wb is the set of buffered mutations waiting to be committed.
775	wb []*Mutation
776}
777
778// BufferWrite adds a list of mutations to the set of updates that will be
779// applied when the transaction is committed. It does not actually apply the
780// write until the transaction is committed, so the operation does not block.
781// The effects of the write won't be visible to any reads (including reads done
782// in the same transaction) until the transaction commits.
783//
784// See the example for Client.ReadWriteTransaction.
785func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error {
786	t.mu.Lock()
787	defer t.mu.Unlock()
788	if t.state == txClosed {
789		return errTxClosed()
790	}
791	if t.state != txActive {
792		return errUnexpectedTxState(t.state)
793	}
794	t.wb = append(t.wb, ms...)
795	return nil
796}
797
798// Update executes a DML statement against the database. It returns the number
799// of affected rows. Update returns an error if the statement is a query.
800// However, the query is executed, and any data read will be validated upon
801// commit.
802func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) {
803	mode := sppb.ExecuteSqlRequest_NORMAL
804	return t.update(ctx, stmt, QueryOptions{
805		Mode:    &mode,
806		Options: t.qo.Options,
807	})
808}
809
810// UpdateWithOptions executes a DML statement against the database. It returns
811// the number of affected rows. The sql query execution will be optimized
812// based on the given query options.
813func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) {
814	return t.update(ctx, stmt, t.qo.merge(opts))
815}
816
817func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) {
818	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update")
819	defer func() { trace.EndSpan(ctx, err) }()
820	req, sh, err := t.prepareExecuteSQL(ctx, stmt, opts)
821	if err != nil {
822		return 0, err
823	}
824	resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req)
825	if err != nil {
826		return 0, toSpannerError(err)
827	}
828	if resultSet.Stats == nil {
829		return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL)
830	}
831	return extractRowCount(resultSet.Stats)
832}
833
834// BatchUpdate groups one or more DML statements and sends them to Spanner in a
835// single RPC. This is an efficient way to execute multiple DML statements.
836//
837// A slice of counts is returned, where each count represents the number of
838// affected rows for the given query at the same index. If an error occurs,
839// counts will be returned up to the query that encountered the error.
840func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) {
841	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate")
842	defer func() { trace.EndSpan(ctx, err) }()
843
844	sh, ts, err := t.acquire(ctx)
845	if err != nil {
846		return nil, err
847	}
848	// Cloud Spanner will return "Session not found" on bad sessions.
849	sid := sh.getID()
850	if sid == "" {
851		// Might happen if transaction is closed in the middle of a API call.
852		return nil, errSessionClosed(sh)
853	}
854
855	var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement
856	for _, st := range stmts {
857		params, paramTypes, err := st.convertParams()
858		if err != nil {
859			return nil, err
860		}
861		sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{
862			Sql:        st.SQL,
863			Params:     params,
864			ParamTypes: paramTypes,
865		})
866	}
867
868	resp, err := sh.getClient().ExecuteBatchDml(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.ExecuteBatchDmlRequest{
869		Session:     sh.getID(),
870		Transaction: ts,
871		Statements:  sppbStmts,
872		Seqno:       atomic.AddInt64(&t.sequenceNumber, 1),
873	})
874	if err != nil {
875		return nil, toSpannerError(err)
876	}
877
878	var counts []int64
879	for _, rs := range resp.ResultSets {
880		count, err := extractRowCount(rs.Stats)
881		if err != nil {
882			return nil, err
883		}
884		counts = append(counts, count)
885	}
886	if resp.Status != nil && resp.Status.Code != 0 {
887		return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message)
888	}
889	return counts, nil
890}
891
892// acquire implements txReadEnv.acquire.
893func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
894	ts := &sppb.TransactionSelector{
895		Selector: &sppb.TransactionSelector_Id{
896			Id: t.tx,
897		},
898	}
899	t.mu.Lock()
900	defer t.mu.Unlock()
901	switch t.state {
902	case txClosed:
903		return nil, nil, errTxClosed()
904	case txActive:
905		return t.sh, ts, nil
906	}
907	return nil, nil, errUnexpectedTxState(t.state)
908}
909
910// release implements txReadEnv.release.
911func (t *ReadWriteTransaction) release(err error) {
912	t.mu.Lock()
913	sh := t.sh
914	t.mu.Unlock()
915	if sh != nil && isSessionNotFoundError(err) {
916		sh.destroy()
917	}
918}
919
920func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) {
921	res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
922		Session: sid,
923		Options: &sppb.TransactionOptions{
924			Mode: &sppb.TransactionOptions_ReadWrite_{
925				ReadWrite: &sppb.TransactionOptions_ReadWrite{},
926			},
927		},
928	})
929	if err != nil {
930		return nil, err
931	}
932	if res.Id == nil {
933		return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.")
934	}
935	return res.Id, nil
936}
937
938// begin starts a read-write transacton on Cloud Spanner, it is always called
939// before any of the public APIs.
940func (t *ReadWriteTransaction) begin(ctx context.Context) error {
941	if t.tx != nil {
942		t.state = txActive
943		return nil
944	}
945	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
946	if err == nil {
947		t.tx = tx
948		t.state = txActive
949		return nil
950	}
951	if isSessionNotFoundError(err) {
952		t.sh.destroy()
953	}
954	return err
955}
956
957// commit tries to commit a readwrite transaction to Cloud Spanner. It also
958// returns the commit timestamp for the transactions.
959func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
960	var ts time.Time
961	t.mu.Lock()
962	t.state = txClosed // No further operations after commit.
963	mPb, err := mutationsProto(t.wb)
964	t.mu.Unlock()
965	if err != nil {
966		return ts, err
967	}
968	// In case that sessionHandle was destroyed but transaction body fails to
969	// report it.
970	sid, client := t.sh.getID(), t.sh.getClient()
971	if sid == "" || client == nil {
972		return ts, errSessionClosed(t.sh)
973	}
974
975	res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{
976		Session: sid,
977		Transaction: &sppb.CommitRequest_TransactionId{
978			TransactionId: t.tx,
979		},
980		Mutations: mPb,
981	})
982	if e != nil {
983		return ts, toSpannerErrorWithCommitInfo(e, true)
984	}
985	if tstamp := res.GetCommitTimestamp(); tstamp != nil {
986		ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
987	}
988	if isSessionNotFoundError(err) {
989		t.sh.destroy()
990	}
991	return ts, err
992}
993
994// rollback is called when a commit is aborted or the transaction body runs
995// into error.
996func (t *ReadWriteTransaction) rollback(ctx context.Context) {
997	t.mu.Lock()
998	// Forbid further operations on rollbacked transaction.
999	t.state = txClosed
1000	t.mu.Unlock()
1001	// In case that sessionHandle was destroyed but transaction body fails to
1002	// report it.
1003	sid, client := t.sh.getID(), t.sh.getClient()
1004	if sid == "" || client == nil {
1005		return
1006	}
1007	err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{
1008		Session:       sid,
1009		TransactionId: t.tx,
1010	})
1011	if isSessionNotFoundError(err) {
1012		t.sh.destroy()
1013	}
1014}
1015
1016// runInTransaction executes f under a read-write transaction context.
1017func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) {
1018	var (
1019		ts              time.Time
1020		err             error
1021		errDuringCommit bool
1022	)
1023	if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
1024		// Try to commit if transaction body returns no error.
1025		ts, err = t.commit(ctx)
1026		errDuringCommit = err != nil
1027	}
1028	if err != nil {
1029		if isAbortedErr(err) {
1030			// Retry the transaction using the same session on ABORT error.
1031			// Cloud Spanner will create the new transaction with the previous
1032			// one's wound-wait priority.
1033			return ts, err
1034		}
1035		if isSessionNotFoundError(err) {
1036			t.sh.destroy()
1037			return ts, err
1038		}
1039		// Rollback the transaction unless the error occurred during the
1040		// commit. Executing a rollback after a commit has failed will
1041		// otherwise cause an error. Note that transient errors, such as
1042		// UNAVAILABLE, are already handled in the gRPC layer and do not show
1043		// up here. Context errors (deadline exceeded / canceled) during
1044		// commits are also not rolled back.
1045		if !errDuringCommit {
1046			t.rollback(ctx)
1047		}
1048		return ts, err
1049	}
1050	// err == nil, return commit timestamp.
1051	return ts, nil
1052}
1053
1054// ReadWriteStmtBasedTransaction provides a wrapper of ReadWriteTransaction in
1055// order to run a read-write transaction in a statement-based way.
1056//
1057// This struct is returned by NewReadWriteStmtBasedTransaction and contains
1058// Commit() and Rollback() methods to end a transaction.
1059type ReadWriteStmtBasedTransaction struct {
1060	// ReadWriteTransaction contains methods for performing transactional reads.
1061	ReadWriteTransaction
1062}
1063
1064// NewReadWriteStmtBasedTransaction starts a read-write transaction. Commit() or
1065// Rollback() must be called to end a transaction. If Commit() or Rollback() is
1066// not called, the session that is used by the transaction will not be returned
1067// to the pool and cause a session leak.
1068//
1069// This method should only be used when manual error handling and retry
1070// management is needed. Cloud Spanner may abort a read/write transaction at any
1071// moment, and each statement that is executed on the transaction should be
1072// checked for an Aborted error, including queries and read operations.
1073//
1074// For most use cases, client.ReadWriteTransaction should be used, as it will
1075// handle all Aborted and 'Session not found' errors automatically.
1076func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWriteStmtBasedTransaction, error) {
1077	var (
1078		sh  *sessionHandle
1079		err error
1080		t   *ReadWriteStmtBasedTransaction
1081	)
1082	sh, err = c.idleSessions.takeWriteSession(ctx)
1083	if err != nil {
1084		// If session retrieval fails, just fail the transaction.
1085		return nil, err
1086	}
1087	t = &ReadWriteStmtBasedTransaction{
1088		ReadWriteTransaction: ReadWriteTransaction{
1089			tx: sh.getTransactionID(),
1090		},
1091	}
1092	t.txReadOnly.sh = sh
1093	t.txReadOnly.txReadEnv = t
1094	t.txReadOnly.qo = c.qo
1095
1096	if err = t.begin(ctx); err != nil {
1097		if sh != nil {
1098			sh.recycle()
1099		}
1100		return nil, err
1101	}
1102	return t, err
1103}
1104
1105// Commit tries to commit a readwrite transaction to Cloud Spanner. It also
1106// returns the commit timestamp for the transactions.
1107func (t *ReadWriteStmtBasedTransaction) Commit(ctx context.Context) (time.Time, error) {
1108	var (
1109		ts  time.Time
1110		err error
1111	)
1112	ts, err = t.commit(ctx)
1113	// Rolling back an aborted transaction is not necessary.
1114	if err != nil && status.Code(err) != codes.Aborted {
1115		t.rollback(ctx)
1116	}
1117	if t.sh != nil {
1118		t.sh.recycle()
1119	}
1120	return ts, err
1121}
1122
1123// Rollback is called to cancel the ongoing transaction that has not been
1124// committed yet.
1125func (t *ReadWriteStmtBasedTransaction) Rollback(ctx context.Context) {
1126	t.rollback(ctx)
1127	if t.sh != nil {
1128		t.sh.recycle()
1129	}
1130}
1131
1132// writeOnlyTransaction provides the most efficient way of doing write-only
1133// transactions. It essentially does blind writes to Cloud Spanner.
1134type writeOnlyTransaction struct {
1135	// sp is the session pool which writeOnlyTransaction uses to get Cloud
1136	// Spanner sessions for blind writes.
1137	sp *sessionPool
1138}
1139
1140// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once,
1141// unless one of the following happens:
1142//
1143//     1) Context times out.
1144//     2) An unretryable error (e.g. database not found) occurs.
1145//     3) There is a malformed Mutation object.
1146func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) {
1147	var (
1148		ts time.Time
1149		sh *sessionHandle
1150	)
1151	mPb, err := mutationsProto(ms)
1152	if err != nil {
1153		// Malformed mutation found, just return the error.
1154		return ts, err
1155	}
1156
1157	// Retry-loop for aborted transactions.
1158	// TODO: Replace with generic retryer.
1159	for {
1160		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
1161			// No usable session for doing the commit, take one from pool.
1162			sh, err = t.sp.take(ctx)
1163			if err != nil {
1164				// sessionPool.Take already retries for session
1165				// creations/retrivals.
1166				return ts, err
1167			}
1168			defer sh.recycle()
1169		}
1170		res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
1171			Session: sh.getID(),
1172			Transaction: &sppb.CommitRequest_SingleUseTransaction{
1173				SingleUseTransaction: &sppb.TransactionOptions{
1174					Mode: &sppb.TransactionOptions_ReadWrite_{
1175						ReadWrite: &sppb.TransactionOptions_ReadWrite{},
1176					},
1177				},
1178			},
1179			Mutations: mPb,
1180		})
1181		if err != nil && !isAbortedErr(err) {
1182			if isSessionNotFoundError(err) {
1183				// Discard the bad session.
1184				sh.destroy()
1185			}
1186			return ts, toSpannerErrorWithCommitInfo(err, true)
1187		} else if err == nil {
1188			if tstamp := res.GetCommitTimestamp(); tstamp != nil {
1189				ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
1190			}
1191			break
1192		}
1193	}
1194	return ts, toSpannerError(err)
1195}
1196
1197// isAbortedErr returns true if the error indicates that an gRPC call is
1198// aborted on the server side.
1199func isAbortedErr(err error) bool {
1200	if err == nil {
1201		return false
1202	}
1203	if ErrCode(err) == codes.Aborted {
1204		return true
1205	}
1206	return false
1207}
1208