1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	"cloud.google.com/go/internal/trace"
26	vkit "cloud.google.com/go/spanner/apiv1"
27	"github.com/golang/protobuf/proto"
28	"google.golang.org/api/iterator"
29	sppb "google.golang.org/genproto/googleapis/spanner/v1"
30	"google.golang.org/grpc/codes"
31	"google.golang.org/grpc/status"
32)
33
34// transactionID stores a transaction ID which uniquely identifies a transaction
35// in Cloud Spanner.
36type transactionID []byte
37
38// txReadEnv manages a read-transaction environment consisting of a session
39// handle and a transaction selector.
40type txReadEnv interface {
41	// acquire returns a read-transaction environment that can be used to
42	// perform a transactional read.
43	acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error)
44	// sets the transaction's read timestamp
45	setTimestamp(time.Time)
46	// release should be called at the end of every transactional read to deal
47	// with session recycling.
48	release(error)
49}
50
51// txReadOnly contains methods for doing transactional reads.
52type txReadOnly struct {
53	// read-transaction environment for performing transactional read
54	// operations.
55	txReadEnv
56
57	// Atomic. Only needed for DML statements, but used forall.
58	sequenceNumber int64
59
60	// replaceSessionFunc is a function that can be called to replace the
61	// session that is used by the transaction. This function should only be
62	// defined for single-use transactions that can safely be retried on a
63	// different session. All other transactions will set this function to nil.
64	replaceSessionFunc func(ctx context.Context) error
65
66	// sp is the session pool for allocating a session to execute the read-only
67	// transaction. It is set only once during initialization of the
68	// txReadOnly.
69	sp *sessionPool
70	// sh is the sessionHandle allocated from sp.
71	sh *sessionHandle
72
73	// qo provides options for executing a sql query.
74	qo QueryOptions
75
76	// txOpts provides options for a transaction.
77	txOpts TransactionOptions
78}
79
80// Internal interface for types that can configure the priority of an RPC.
81type requestPrioritizer interface {
82	requestPriority() sppb.RequestOptions_Priority
83}
84
85// TransactionOptions provides options for a transaction.
86type TransactionOptions struct {
87	CommitOptions CommitOptions
88
89	// CommitPriority is the priority to use for the Commit RPC for the
90	// transaction.
91	CommitPriority sppb.RequestOptions_Priority
92}
93
94func (to *TransactionOptions) requestPriority() sppb.RequestOptions_Priority {
95	return to.CommitPriority
96}
97
98// errSessionClosed returns error for using a recycled/destroyed session
99func errSessionClosed(sh *sessionHandle) error {
100	return spannerErrorf(codes.FailedPrecondition,
101		"session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient())
102}
103
104// Read returns a RowIterator for reading multiple rows from the database.
105func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator {
106	return t.ReadWithOptions(ctx, table, keys, columns, nil)
107}
108
109// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}.
110func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) {
111	return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index})
112}
113
114// ReadOptions provides options for reading rows from a database.
115type ReadOptions struct {
116	// The index to use for reading. If non-empty, you can only read columns
117	// that are part of the index key, part of the primary key, or stored in the
118	// index due to a STORING clause in the index definition.
119	Index string
120
121	// The maximum number of rows to read. A limit value less than 1 means no
122	// limit.
123	Limit int
124
125	// Priority is the RPC priority to use for the read operation.
126	Priority sppb.RequestOptions_Priority
127}
128
129func (ro *ReadOptions) requestPriority() sppb.RequestOptions_Priority {
130	return ro.Priority
131}
132
133// ReadWithOptions returns a RowIterator for reading multiple rows from the
134// database. Pass a ReadOptions to modify the read operation.
135func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) {
136	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read")
137	defer func() { trace.EndSpan(ctx, ri.err) }()
138	var (
139		sh  *sessionHandle
140		ts  *sppb.TransactionSelector
141		err error
142	)
143	kset, err := keys.keySetProto()
144	if err != nil {
145		return &RowIterator{err: err}
146	}
147	if sh, ts, err = t.acquire(ctx); err != nil {
148		return &RowIterator{err: err}
149	}
150	// Cloud Spanner will return "Session not found" on bad sessions.
151	client := sh.getClient()
152	if client == nil {
153		// Might happen if transaction is closed in the middle of a API call.
154		return &RowIterator{err: errSessionClosed(sh)}
155	}
156	index := ""
157	limit := 0
158	var ro *sppb.RequestOptions
159	if opts != nil {
160		index = opts.Index
161		if opts.Limit > 0 {
162			limit = opts.Limit
163		}
164		ro = createRequestOptions(opts)
165	}
166	return streamWithReplaceSessionFunc(
167		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
168		sh.session.logger,
169		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
170			return client.StreamingRead(ctx,
171				&sppb.ReadRequest{
172					Session:        t.sh.getID(),
173					Transaction:    ts,
174					Table:          table,
175					Index:          index,
176					Columns:        columns,
177					KeySet:         kset,
178					ResumeToken:    resumeToken,
179					Limit:          int64(limit),
180					RequestOptions: ro,
181				})
182		},
183		t.replaceSessionFunc,
184		t.setTimestamp,
185		t.release,
186	)
187}
188
189// errRowNotFound returns error for not being able to read the row identified by
190// key.
191func errRowNotFound(table string, key Key) error {
192	return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key)
193}
194
195// errRowNotFoundByIndex returns error for not being able to read the row by index.
196func errRowNotFoundByIndex(table string, key Key, index string) error {
197	return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
198}
199
200// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index.
201func errMultipleRowsFound(table string, key Key, index string) error {
202	return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
203}
204
205// ReadRow reads a single row from the database.
206//
207// If no row is present with the given key, then ReadRow returns an error where
208// spanner.ErrCode(err) is codes.NotFound.
209func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) {
210	iter := t.Read(ctx, table, key, columns)
211	defer iter.Stop()
212	row, err := iter.Next()
213	switch err {
214	case iterator.Done:
215		return nil, errRowNotFound(table, key)
216	case nil:
217		return row, nil
218	default:
219		return nil, err
220	}
221}
222
223// ReadRowUsingIndex reads a single row from the database using an index.
224//
225// If no row is present with the given index, then ReadRowUsingIndex returns an
226// error where spanner.ErrCode(err) is codes.NotFound.
227//
228// If more than one row received with the given index, then ReadRowUsingIndex
229// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition.
230func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) {
231	iter := t.ReadUsingIndex(ctx, table, index, key, columns)
232	defer iter.Stop()
233	row, err := iter.Next()
234	switch err {
235	case iterator.Done:
236		return nil, errRowNotFoundByIndex(table, key, index)
237	case nil:
238		// If more than one row found, return an error.
239		_, err := iter.Next()
240		switch err {
241		case iterator.Done:
242			return row, nil
243		case nil:
244			return nil, errMultipleRowsFound(table, key, index)
245		default:
246			return nil, err
247		}
248	default:
249		return nil, err
250	}
251}
252
253// QueryOptions provides options for executing a sql query or update statement.
254type QueryOptions struct {
255	Mode    *sppb.ExecuteSqlRequest_QueryMode
256	Options *sppb.ExecuteSqlRequest_QueryOptions
257
258	// Priority is the RPC priority to use for the query/update.
259	Priority sppb.RequestOptions_Priority
260}
261
262func (qo *QueryOptions) requestPriority() sppb.RequestOptions_Priority {
263	return qo.Priority
264}
265
266// merge combines two QueryOptions that the input parameter will have higher
267// order of precedence.
268func (qo QueryOptions) merge(opts QueryOptions) QueryOptions {
269	merged := QueryOptions{
270		Mode:     qo.Mode,
271		Options:  &sppb.ExecuteSqlRequest_QueryOptions{},
272		Priority: qo.Priority,
273	}
274	if opts.Mode != nil {
275		merged.Mode = opts.Mode
276	}
277	if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
278		merged.Priority = opts.Priority
279	}
280	proto.Merge(merged.Options, qo.Options)
281	proto.Merge(merged.Options, opts.Options)
282	return merged
283}
284
285func createRequestOptions(prioritizer requestPrioritizer) (ro *sppb.RequestOptions) {
286	if prioritizer == nil {
287		return nil
288	}
289	if prioritizer.requestPriority() != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
290		ro = &sppb.RequestOptions{Priority: prioritizer.requestPriority()}
291	}
292	return ro
293}
294
295// Query executes a query against the database. It returns a RowIterator for
296// retrieving the resulting rows.
297//
298// Query returns only row data, without a query plan or execution statistics.
299// Use QueryWithStats to get rows along with the plan and statistics. Use
300// AnalyzeQuery to get just the plan.
301func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator {
302	mode := sppb.ExecuteSqlRequest_NORMAL
303	return t.query(ctx, statement, QueryOptions{
304		Mode:    &mode,
305		Options: t.qo.Options,
306	})
307}
308
309// QueryWithOptions executes a SQL statment against the database. It returns
310// a RowIterator for retrieving the resulting rows. The sql query execution
311// will be optimized based on the given query options.
312func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, opts QueryOptions) *RowIterator {
313	return t.query(ctx, statement, t.qo.merge(opts))
314}
315
316// QueryWithStats executes a SQL statement against the database. It returns
317// a RowIterator for retrieving the resulting rows. The RowIterator will also
318// be populated with a query plan and execution statistics.
319func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator {
320	mode := sppb.ExecuteSqlRequest_PROFILE
321	return t.query(ctx, statement, QueryOptions{
322		Mode:    &mode,
323		Options: t.qo.Options,
324	})
325}
326
327// AnalyzeQuery returns the query plan for statement.
328func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) {
329	mode := sppb.ExecuteSqlRequest_PLAN
330	iter := t.query(ctx, statement, QueryOptions{
331		Mode:    &mode,
332		Options: t.qo.Options,
333	})
334	defer iter.Stop()
335	for {
336		_, err := iter.Next()
337		if err == iterator.Done {
338			break
339		}
340		if err != nil {
341			return nil, err
342		}
343	}
344	if iter.QueryPlan == nil {
345		return nil, spannerErrorf(codes.Internal, "query plan unavailable")
346	}
347	return iter.QueryPlan, nil
348}
349
350func (t *txReadOnly) query(ctx context.Context, statement Statement, options QueryOptions) (ri *RowIterator) {
351	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query")
352	defer func() { trace.EndSpan(ctx, ri.err) }()
353	req, sh, err := t.prepareExecuteSQL(ctx, statement, options)
354	if err != nil {
355		return &RowIterator{err: err}
356	}
357	client := sh.getClient()
358	return streamWithReplaceSessionFunc(
359		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
360		sh.session.logger,
361		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
362			req.ResumeToken = resumeToken
363			req.Session = t.sh.getID()
364			return client.ExecuteStreamingSql(ctx, req)
365		},
366		t.replaceSessionFunc,
367		t.setTimestamp,
368		t.release)
369}
370
371func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, options QueryOptions) (*sppb.ExecuteSqlRequest, *sessionHandle, error) {
372	sh, ts, err := t.acquire(ctx)
373	if err != nil {
374		return nil, nil, err
375	}
376	// Cloud Spanner will return "Session not found" on bad sessions.
377	sid := sh.getID()
378	if sid == "" {
379		// Might happen if transaction is closed in the middle of a API call.
380		return nil, nil, errSessionClosed(sh)
381	}
382	params, paramTypes, err := stmt.convertParams()
383	if err != nil {
384		return nil, nil, err
385	}
386	mode := sppb.ExecuteSqlRequest_NORMAL
387	if options.Mode != nil {
388		mode = *options.Mode
389	}
390	req := &sppb.ExecuteSqlRequest{
391		Session:        sid,
392		Transaction:    ts,
393		Sql:            stmt.SQL,
394		QueryMode:      mode,
395		Seqno:          atomic.AddInt64(&t.sequenceNumber, 1),
396		Params:         params,
397		ParamTypes:     paramTypes,
398		QueryOptions:   options.Options,
399		RequestOptions: createRequestOptions(&options),
400	}
401	return req, sh, nil
402}
403
404// txState is the status of a transaction.
405type txState int
406
407const (
408	// transaction is new, waiting to be initialized..
409	txNew txState = iota
410	// transaction is being initialized.
411	txInit
412	// transaction is active and can perform read/write.
413	txActive
414	// transaction is closed, cannot be used anymore.
415	txClosed
416)
417
418// errRtsUnavailable returns error for read transaction's read timestamp being
419// unavailable.
420func errRtsUnavailable() error {
421	return spannerErrorf(codes.Internal, "read timestamp is unavailable")
422}
423
424// errTxClosed returns error for using a closed transaction.
425func errTxClosed() error {
426	return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction")
427}
428
429// errUnexpectedTxState returns error for transaction enters an unexpected state.
430func errUnexpectedTxState(ts txState) error {
431	return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts)
432}
433
434// ReadOnlyTransaction provides a snapshot transaction with guaranteed
435// consistency across reads, but does not allow writes.  Read-only transactions
436// can be configured to read at timestamps in the past.
437//
438// Read-only transactions do not take locks. Instead, they work by choosing a
439// Cloud Spanner timestamp, then executing all reads at that timestamp. Since
440// they do not acquire locks, they do not block concurrent read-write
441// transactions.
442//
443// Unlike locking read-write transactions, read-only transactions never abort.
444// They can fail if the chosen read timestamp is garbage collected; however, the
445// default garbage collection policy is generous enough that most applications
446// do not need to worry about this in practice. See the documentation of
447// TimestampBound for more details.
448//
449// A ReadOnlyTransaction consumes resources on the server until Close is called.
450type ReadOnlyTransaction struct {
451	// mu protects concurrent access to the internal states of ReadOnlyTransaction.
452	mu sync.Mutex
453	// txReadOnly contains methods for performing transactional reads.
454	txReadOnly
455	// singleUse indicates that the transaction can be used for only one read.
456	singleUse bool
457	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
458	// ReadOnlyTransaction.
459	tx transactionID
460	// txReadyOrClosed is for broadcasting that transaction ID has been returned
461	// by Cloud Spanner or that transaction is closed.
462	txReadyOrClosed chan struct{}
463	// state is the current transaction status of the ReadOnly transaction.
464	state txState
465	// rts is the read timestamp returned by transactional reads.
466	rts time.Time
467	// tb is the read staleness bound specification for transactional reads.
468	tb TimestampBound
469}
470
471// errTxInitTimeout returns error for timeout in waiting for initialization of
472// the transaction.
473func errTxInitTimeout() error {
474	return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization")
475}
476
477// getTimestampBound returns the read staleness bound specified for the
478// ReadOnlyTransaction.
479func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound {
480	t.mu.Lock()
481	defer t.mu.Unlock()
482	return t.tb
483}
484
485// begin starts a snapshot read-only Transaction on Cloud Spanner.
486func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
487	var (
488		locked bool
489		tx     transactionID
490		rts    time.Time
491		sh     *sessionHandle
492		err    error
493		res    *sppb.Transaction
494	)
495	defer func() {
496		if !locked {
497			t.mu.Lock()
498			// Not necessary, just to make it clear that t.mu is being held when
499			// locked == true.
500			locked = true
501		}
502		if t.state != txClosed {
503			// Signal other initialization routines.
504			close(t.txReadyOrClosed)
505			t.txReadyOrClosed = make(chan struct{})
506		}
507		t.mu.Unlock()
508		if err != nil && sh != nil {
509			// Got a valid session handle, but failed to initialize transaction=
510			// on Cloud Spanner.
511			if isSessionNotFoundError(err) {
512				sh.destroy()
513			}
514			// If sh.destroy was already executed, this becomes a noop.
515			sh.recycle()
516		}
517	}()
518	// Retry the BeginTransaction call if a 'Session not found' is returned.
519	for {
520		sh, err = t.sp.take(ctx)
521		if err != nil {
522			return err
523		}
524		res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
525			Session: sh.getID(),
526			Options: &sppb.TransactionOptions{
527				Mode: &sppb.TransactionOptions_ReadOnly_{
528					ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
529				},
530			},
531		})
532		if isSessionNotFoundError(err) {
533			sh.destroy()
534			continue
535		} else if err == nil {
536			tx = res.Id
537			if res.ReadTimestamp != nil {
538				rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
539			}
540		} else {
541			err = ToSpannerError(err)
542		}
543		break
544	}
545	t.mu.Lock()
546
547	// defer function will be executed with t.mu being held.
548	locked = true
549
550	// During the execution of t.begin(), t.Close() was invoked.
551	if t.state == txClosed {
552		return errSessionClosed(sh)
553	}
554
555	// If begin() fails, this allows other queries to take over the
556	// initialization.
557	t.tx = nil
558	if err == nil {
559		t.tx = tx
560		t.rts = rts
561		t.sh = sh
562		// State transite to txActive.
563		t.state = txActive
564	}
565	return err
566}
567
568// acquire implements txReadEnv.acquire.
569func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
570	if err := checkNestedTxn(ctx); err != nil {
571		return nil, nil, err
572	}
573	if t.singleUse {
574		return t.acquireSingleUse(ctx)
575	}
576	return t.acquireMultiUse(ctx)
577}
578
579func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
580	t.mu.Lock()
581	defer t.mu.Unlock()
582	switch t.state {
583	case txClosed:
584		// A closed single-use transaction can never be reused.
585		return nil, nil, errTxClosed()
586	case txNew:
587		t.state = txClosed
588		ts := &sppb.TransactionSelector{
589			Selector: &sppb.TransactionSelector_SingleUse{
590				SingleUse: &sppb.TransactionOptions{
591					Mode: &sppb.TransactionOptions_ReadOnly_{
592						ReadOnly: buildTransactionOptionsReadOnly(t.tb, true),
593					},
594				},
595			},
596		}
597		sh, err := t.sp.take(ctx)
598		if err != nil {
599			return nil, nil, err
600		}
601
602		// Install session handle into t, which can be used for readonly
603		// operations later.
604		t.sh = sh
605		return sh, ts, nil
606	}
607	us := t.state
608
609	// SingleUse transaction should only be in either txNew state or txClosed
610	// state.
611	return nil, nil, errUnexpectedTxState(us)
612}
613
614func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
615	for {
616		t.mu.Lock()
617		switch t.state {
618		case txClosed:
619			t.mu.Unlock()
620			return nil, nil, errTxClosed()
621		case txNew:
622			// State transit to txInit so that no further TimestampBound change
623			// is accepted.
624			t.state = txInit
625			t.mu.Unlock()
626			continue
627		case txInit:
628			if t.tx != nil {
629				// Wait for a transaction ID to become ready.
630				txReadyOrClosed := t.txReadyOrClosed
631				t.mu.Unlock()
632				select {
633				case <-txReadyOrClosed:
634					// Need to check transaction state again.
635					continue
636				case <-ctx.Done():
637					// The waiting for initialization is timeout, return error
638					// directly.
639					return nil, nil, errTxInitTimeout()
640				}
641			}
642			// Take the ownership of initializing the transaction.
643			t.tx = transactionID{}
644			t.mu.Unlock()
645			// Begin a read-only transaction.
646			//
647			// TODO: consider adding a transaction option which allow queries to
648			//  initiate transactions by themselves. Note that this option might
649			//  not be always good because the ID of the new transaction won't
650			//  be ready till the query returns some data or completes.
651			if err := t.begin(ctx); err != nil {
652				return nil, nil, err
653			}
654
655			// If t.begin() succeeded, t.state should have been changed to
656			// txActive, so we can just continue here.
657			continue
658		case txActive:
659			sh := t.sh
660			ts := &sppb.TransactionSelector{
661				Selector: &sppb.TransactionSelector_Id{
662					Id: t.tx,
663				},
664			}
665			t.mu.Unlock()
666			return sh, ts, nil
667		}
668		state := t.state
669		t.mu.Unlock()
670		return nil, nil, errUnexpectedTxState(state)
671	}
672}
673
674func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) {
675	t.mu.Lock()
676	defer t.mu.Unlock()
677	if t.rts.IsZero() {
678		t.rts = ts
679	}
680}
681
682// release implements txReadEnv.release.
683func (t *ReadOnlyTransaction) release(err error) {
684	t.mu.Lock()
685	sh := t.sh
686	t.mu.Unlock()
687	if sh != nil { // sh could be nil if t.acquire() fails.
688		if isSessionNotFoundError(err) {
689			sh.destroy()
690		}
691		if t.singleUse {
692			// If session handle is already destroyed, this becomes a noop.
693			sh.recycle()
694		}
695	}
696}
697
698// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads
699// after being closed.
700func (t *ReadOnlyTransaction) Close() {
701	if t.singleUse {
702		return
703	}
704	t.mu.Lock()
705	if t.state != txClosed {
706		t.state = txClosed
707		close(t.txReadyOrClosed)
708	}
709	sh := t.sh
710	t.mu.Unlock()
711	if sh == nil {
712		return
713	}
714	// If session handle is already destroyed, this becomes a noop. If there are
715	// still active queries and if the recycled session is reused before they
716	// complete, Cloud Spanner will cancel them on behalf of the new transaction
717	// on the session.
718	if sh != nil {
719		sh.recycle()
720	}
721}
722
723// Timestamp returns the timestamp chosen to perform reads and queries in this
724// transaction. The value can only be read after some read or query has either
725// returned some data or completed without returning any data.
726func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) {
727	t.mu.Lock()
728	defer t.mu.Unlock()
729	if t.rts.IsZero() {
730		return t.rts, errRtsUnavailable()
731	}
732	return t.rts, nil
733}
734
735// WithTimestampBound specifies the TimestampBound to use for read or query.
736// This can only be used before the first read or query is invoked. Note:
737// bounded staleness is not available with general ReadOnlyTransactions; use a
738// single-use ReadOnlyTransaction instead.
739//
740// The returned value is the ReadOnlyTransaction so calls can be chained.
741func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction {
742	t.mu.Lock()
743	defer t.mu.Unlock()
744	if t.state == txNew {
745		// Only allow to set TimestampBound before the first query.
746		t.tb = tb
747	}
748	return t
749}
750
751// ReadWriteTransaction provides a locking read-write transaction.
752//
753// This type of transaction is the only way to write data into Cloud Spanner;
754// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use
755// transactions internally. These transactions rely on pessimistic locking and,
756// if necessary, two-phase commit. Locking read-write transactions may abort,
757// requiring the application to retry. However, the interface exposed by
758// (*Client).ReadWriteTransaction eliminates the need for applications to write
759// retry loops explicitly.
760//
761// Locking transactions may be used to atomically read-modify-write data
762// anywhere in a database. This type of transaction is externally consistent.
763//
764// Clients should attempt to minimize the amount of time a transaction is
765// active. Faster transactions commit with higher probability and cause less
766// contention. Cloud Spanner attempts to keep read locks active as long as the
767// transaction continues to do reads.  Long periods of inactivity at the client
768// may cause Cloud Spanner to release a transaction's locks and abort it.
769//
770// Reads performed within a transaction acquire locks on the data being
771// read. Writes can only be done at commit time, after all reads have been
772// completed. Conceptually, a read-write transaction consists of zero or more
773// reads or SQL queries followed by a commit.
774//
775// See (*Client).ReadWriteTransaction for an example.
776//
777// Semantics
778//
779// Cloud Spanner can commit the transaction if all read locks it acquired are
780// still valid at commit time, and it is able to acquire write locks for all
781// writes. Cloud Spanner can abort the transaction for any reason. If a commit
782// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has
783// not modified any user data in Cloud Spanner.
784//
785// Unless the transaction commits, Cloud Spanner makes no guarantees about how
786// long the transaction's locks were held for. It is an error to use Cloud
787// Spanner locks for any sort of mutual exclusion other than between Cloud
788// Spanner transactions themselves.
789//
790// Aborted transactions
791//
792// Application code does not need to retry explicitly; RunInTransaction will
793// automatically retry a transaction if an attempt results in an abort. The lock
794// priority of a transaction increases after each prior aborted transaction,
795// meaning that the next attempt has a slightly better chance of success than
796// before.
797//
798// Under some circumstances (e.g., many transactions attempting to modify the
799// same row(s)), a transaction can abort many times in a short period before
800// successfully committing. Thus, it is not a good idea to cap the number of
801// retries a transaction can attempt; instead, it is better to limit the total
802// amount of wall time spent retrying.
803//
804// Idle transactions
805//
806// A transaction is considered idle if it has no outstanding reads or SQL
807// queries and has not started a read or SQL query within the last 10
808// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't
809// hold on to locks indefinitely. In that case, the commit will fail with error
810// ABORTED.
811//
812// If this behavior is undesirable, periodically executing a simple SQL query
813// in the transaction (e.g., SELECT 1) prevents the transaction from becoming
814// idle.
815type ReadWriteTransaction struct {
816	// txReadOnly contains methods for performing transactional reads.
817	txReadOnly
818	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
819	// ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin()
820	// during the initialization of ReadWriteTransaction.
821	tx transactionID
822	// mu protects concurrent access to the internal states of
823	// ReadWriteTransaction.
824	mu sync.Mutex
825	// state is the current transaction status of the read-write transaction.
826	state txState
827	// wb is the set of buffered mutations waiting to be committed.
828	wb []*Mutation
829}
830
831// BufferWrite adds a list of mutations to the set of updates that will be
832// applied when the transaction is committed. It does not actually apply the
833// write until the transaction is committed, so the operation does not block.
834// The effects of the write won't be visible to any reads (including reads done
835// in the same transaction) until the transaction commits.
836//
837// See the example for Client.ReadWriteTransaction.
838func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error {
839	t.mu.Lock()
840	defer t.mu.Unlock()
841	if t.state == txClosed {
842		return errTxClosed()
843	}
844	if t.state != txActive {
845		return errUnexpectedTxState(t.state)
846	}
847	t.wb = append(t.wb, ms...)
848	return nil
849}
850
851// Update executes a DML statement against the database. It returns the number
852// of affected rows. Update returns an error if the statement is a query.
853// However, the query is executed, and any data read will be validated upon
854// commit.
855func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) {
856	mode := sppb.ExecuteSqlRequest_NORMAL
857	return t.update(ctx, stmt, QueryOptions{
858		Mode:    &mode,
859		Options: t.qo.Options,
860	})
861}
862
863// UpdateWithOptions executes a DML statement against the database. It returns
864// the number of affected rows. The given QueryOptions will be used for the
865// execution of this statement.
866func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) {
867	return t.update(ctx, stmt, t.qo.merge(opts))
868}
869
870func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) {
871	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update")
872	defer func() { trace.EndSpan(ctx, err) }()
873	req, sh, err := t.prepareExecuteSQL(ctx, stmt, opts)
874	if err != nil {
875		return 0, err
876	}
877	resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req)
878	if err != nil {
879		return 0, ToSpannerError(err)
880	}
881	if resultSet.Stats == nil {
882		return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL)
883	}
884	return extractRowCount(resultSet.Stats)
885}
886
887// BatchUpdate groups one or more DML statements and sends them to Spanner in a
888// single RPC. This is an efficient way to execute multiple DML statements.
889//
890// A slice of counts is returned, where each count represents the number of
891// affected rows for the given query at the same index. If an error occurs,
892// counts will be returned up to the query that encountered the error.
893func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) {
894	return t.BatchUpdateWithOptions(ctx, stmts, QueryOptions{})
895}
896
897// BatchUpdateWithOptions groups one or more DML statements and sends them to
898// Spanner in a single RPC. This is an efficient way to execute multiple DML
899// statements.
900//
901// A slice of counts is returned, where each count represents the number of
902// affected rows for the given query at the same index. If an error occurs,
903// counts will be returned up to the query that encountered the error.
904//
905// The priority given in the QueryOptions will be included with the RPC.
906// Any other options that are set in the QueryOptions struct will be ignored.
907func (t *ReadWriteTransaction) BatchUpdateWithOptions(ctx context.Context, stmts []Statement, opts QueryOptions) (_ []int64, err error) {
908	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate")
909	defer func() { trace.EndSpan(ctx, err) }()
910
911	sh, ts, err := t.acquire(ctx)
912	if err != nil {
913		return nil, err
914	}
915	// Cloud Spanner will return "Session not found" on bad sessions.
916	sid := sh.getID()
917	if sid == "" {
918		// Might happen if transaction is closed in the middle of a API call.
919		return nil, errSessionClosed(sh)
920	}
921
922	var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement
923	for _, st := range stmts {
924		params, paramTypes, err := st.convertParams()
925		if err != nil {
926			return nil, err
927		}
928		sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{
929			Sql:        st.SQL,
930			Params:     params,
931			ParamTypes: paramTypes,
932		})
933	}
934
935	resp, err := sh.getClient().ExecuteBatchDml(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.ExecuteBatchDmlRequest{
936		Session:        sh.getID(),
937		Transaction:    ts,
938		Statements:     sppbStmts,
939		Seqno:          atomic.AddInt64(&t.sequenceNumber, 1),
940		RequestOptions: createRequestOptions(&opts),
941	})
942	if err != nil {
943		return nil, ToSpannerError(err)
944	}
945
946	var counts []int64
947	for _, rs := range resp.ResultSets {
948		count, err := extractRowCount(rs.Stats)
949		if err != nil {
950			return nil, err
951		}
952		counts = append(counts, count)
953	}
954	if resp.Status != nil && resp.Status.Code != 0 {
955		return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message)
956	}
957	return counts, nil
958}
959
960// acquire implements txReadEnv.acquire.
961func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
962	ts := &sppb.TransactionSelector{
963		Selector: &sppb.TransactionSelector_Id{
964			Id: t.tx,
965		},
966	}
967	t.mu.Lock()
968	defer t.mu.Unlock()
969	switch t.state {
970	case txClosed:
971		return nil, nil, errTxClosed()
972	case txActive:
973		return t.sh, ts, nil
974	}
975	return nil, nil, errUnexpectedTxState(t.state)
976}
977
978// release implements txReadEnv.release.
979func (t *ReadWriteTransaction) release(err error) {
980	t.mu.Lock()
981	sh := t.sh
982	t.mu.Unlock()
983	if sh != nil && isSessionNotFoundError(err) {
984		sh.destroy()
985	}
986}
987
988func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) {
989	res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
990		Session: sid,
991		Options: &sppb.TransactionOptions{
992			Mode: &sppb.TransactionOptions_ReadWrite_{
993				ReadWrite: &sppb.TransactionOptions_ReadWrite{},
994			},
995		},
996	})
997	if err != nil {
998		return nil, err
999	}
1000	if res.Id == nil {
1001		return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.")
1002	}
1003	return res.Id, nil
1004}
1005
1006// begin starts a read-write transacton on Cloud Spanner, it is always called
1007// before any of the public APIs.
1008func (t *ReadWriteTransaction) begin(ctx context.Context) error {
1009	if t.tx != nil {
1010		t.state = txActive
1011		return nil
1012	}
1013	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
1014	if err == nil {
1015		t.tx = tx
1016		t.state = txActive
1017		return nil
1018	}
1019	if isSessionNotFoundError(err) {
1020		t.sh.destroy()
1021	}
1022	return err
1023}
1024
1025// CommitResponse provides a response of a transaction commit in a database.
1026type CommitResponse struct {
1027	// CommitTs is the commit time for a transaction.
1028	CommitTs time.Time
1029	// CommitStats is the commit statistics for a transaction.
1030	CommitStats *sppb.CommitResponse_CommitStats
1031}
1032
1033// CommitOptions provides options for commiting a transaction in a database.
1034type CommitOptions struct {
1035	ReturnCommitStats bool
1036}
1037
1038// commit tries to commit a readwrite transaction to Cloud Spanner. It also
1039// returns the commit response for the transactions.
1040func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions) (CommitResponse, error) {
1041	resp := CommitResponse{}
1042	t.mu.Lock()
1043	t.state = txClosed // No further operations after commit.
1044	mPb, err := mutationsProto(t.wb)
1045	t.mu.Unlock()
1046	if err != nil {
1047		return resp, err
1048	}
1049
1050	// In case that sessionHandle was destroyed but transaction body fails to
1051	// report it.
1052	sid, client := t.sh.getID(), t.sh.getClient()
1053	if sid == "" || client == nil {
1054		return resp, errSessionClosed(t.sh)
1055	}
1056
1057	res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{
1058		Session: sid,
1059		Transaction: &sppb.CommitRequest_TransactionId{
1060			TransactionId: t.tx,
1061		},
1062		RequestOptions:    createRequestOptions(&t.txOpts),
1063		Mutations:         mPb,
1064		ReturnCommitStats: options.ReturnCommitStats,
1065	})
1066	if e != nil {
1067		return resp, toSpannerErrorWithCommitInfo(e, true)
1068	}
1069	if tstamp := res.GetCommitTimestamp(); tstamp != nil {
1070		resp.CommitTs = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
1071	}
1072	if options.ReturnCommitStats {
1073		resp.CommitStats = res.CommitStats
1074	}
1075	if isSessionNotFoundError(err) {
1076		t.sh.destroy()
1077	}
1078	return resp, err
1079}
1080
1081// rollback is called when a commit is aborted or the transaction body runs
1082// into error.
1083func (t *ReadWriteTransaction) rollback(ctx context.Context) {
1084	t.mu.Lock()
1085	// Forbid further operations on rollbacked transaction.
1086	t.state = txClosed
1087	t.mu.Unlock()
1088	// In case that sessionHandle was destroyed but transaction body fails to
1089	// report it.
1090	sid, client := t.sh.getID(), t.sh.getClient()
1091	if sid == "" || client == nil {
1092		return
1093	}
1094	err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{
1095		Session:       sid,
1096		TransactionId: t.tx,
1097	})
1098	if isSessionNotFoundError(err) {
1099		t.sh.destroy()
1100	}
1101}
1102
1103// runInTransaction executes f under a read-write transaction context.
1104func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (CommitResponse, error) {
1105	var (
1106		resp            CommitResponse
1107		err             error
1108		errDuringCommit bool
1109	)
1110	if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
1111		// Try to commit if transaction body returns no error.
1112		resp, err = t.commit(ctx, t.txOpts.CommitOptions)
1113		errDuringCommit = err != nil
1114	}
1115	if err != nil {
1116		if isAbortedErr(err) {
1117			// Retry the transaction using the same session on ABORT error.
1118			// Cloud Spanner will create the new transaction with the previous
1119			// one's wound-wait priority.
1120			return resp, err
1121		}
1122		if isSessionNotFoundError(err) {
1123			t.sh.destroy()
1124			return resp, err
1125		}
1126		// Rollback the transaction unless the error occurred during the
1127		// commit. Executing a rollback after a commit has failed will
1128		// otherwise cause an error. Note that transient errors, such as
1129		// UNAVAILABLE, are already handled in the gRPC layer and do not show
1130		// up here. Context errors (deadline exceeded / canceled) during
1131		// commits are also not rolled back.
1132		if !errDuringCommit {
1133			t.rollback(ctx)
1134		}
1135		return resp, err
1136	}
1137	// err == nil, return commit response.
1138	return resp, nil
1139}
1140
1141// ReadWriteStmtBasedTransaction provides a wrapper of ReadWriteTransaction in
1142// order to run a read-write transaction in a statement-based way.
1143//
1144// This struct is returned by NewReadWriteStmtBasedTransaction and contains
1145// Commit() and Rollback() methods to end a transaction.
1146type ReadWriteStmtBasedTransaction struct {
1147	// ReadWriteTransaction contains methods for performing transactional reads.
1148	ReadWriteTransaction
1149
1150	options TransactionOptions
1151}
1152
1153// NewReadWriteStmtBasedTransaction starts a read-write transaction. Commit() or
1154// Rollback() must be called to end a transaction. If Commit() or Rollback() is
1155// not called, the session that is used by the transaction will not be returned
1156// to the pool and cause a session leak.
1157//
1158// This method should only be used when manual error handling and retry
1159// management is needed. Cloud Spanner may abort a read/write transaction at any
1160// moment, and each statement that is executed on the transaction should be
1161// checked for an Aborted error, including queries and read operations.
1162//
1163// For most use cases, client.ReadWriteTransaction should be used, as it will
1164// handle all Aborted and 'Session not found' errors automatically.
1165func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWriteStmtBasedTransaction, error) {
1166	return NewReadWriteStmtBasedTransactionWithOptions(ctx, c, TransactionOptions{})
1167}
1168
1169// NewReadWriteStmtBasedTransactionWithOptions starts a read-write transaction
1170// with configurable options. Commit() or Rollback() must be called to end a
1171// transaction. If Commit() or Rollback() is not called, the session that is
1172// used by the transaction will not be returned to the pool and cause a session
1173// leak.
1174//
1175// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of
1176// NewReadWriteStmtBasedTransaction.
1177func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
1178	var (
1179		sh  *sessionHandle
1180		err error
1181		t   *ReadWriteStmtBasedTransaction
1182	)
1183	sh, err = c.idleSessions.takeWriteSession(ctx)
1184	if err != nil {
1185		// If session retrieval fails, just fail the transaction.
1186		return nil, err
1187	}
1188	t = &ReadWriteStmtBasedTransaction{
1189		ReadWriteTransaction: ReadWriteTransaction{
1190			tx: sh.getTransactionID(),
1191		},
1192	}
1193	t.txReadOnly.sh = sh
1194	t.txReadOnly.txReadEnv = t
1195	t.txReadOnly.qo = c.qo
1196	t.txOpts = options
1197
1198	if err = t.begin(ctx); err != nil {
1199		if sh != nil {
1200			sh.recycle()
1201		}
1202		return nil, err
1203	}
1204	return t, err
1205}
1206
1207// Commit tries to commit a readwrite transaction to Cloud Spanner. It also
1208// returns the commit timestamp for the transactions.
1209func (t *ReadWriteStmtBasedTransaction) Commit(ctx context.Context) (time.Time, error) {
1210	resp, err := t.CommitWithReturnResp(ctx)
1211	return resp.CommitTs, err
1212}
1213
1214// CommitWithReturnResp tries to commit a readwrite transaction. It also returns
1215// the commit timestamp and stats for the transactions.
1216func (t *ReadWriteStmtBasedTransaction) CommitWithReturnResp(ctx context.Context) (CommitResponse, error) {
1217	resp, err := t.commit(ctx, t.txOpts.CommitOptions)
1218	// Rolling back an aborted transaction is not necessary.
1219	if err != nil && status.Code(err) != codes.Aborted {
1220		t.rollback(ctx)
1221	}
1222	if t.sh != nil {
1223		t.sh.recycle()
1224	}
1225	return resp, err
1226}
1227
1228// Rollback is called to cancel the ongoing transaction that has not been
1229// committed yet.
1230func (t *ReadWriteStmtBasedTransaction) Rollback(ctx context.Context) {
1231	t.rollback(ctx)
1232	if t.sh != nil {
1233		t.sh.recycle()
1234	}
1235}
1236
1237// writeOnlyTransaction provides the most efficient way of doing write-only
1238// transactions. It essentially does blind writes to Cloud Spanner.
1239type writeOnlyTransaction struct {
1240	// sp is the session pool which writeOnlyTransaction uses to get Cloud
1241	// Spanner sessions for blind writes.
1242	sp *sessionPool
1243	// commitPriority is the RPC priority to use for the commit operation.
1244	commitPriority sppb.RequestOptions_Priority
1245}
1246
1247func (t *writeOnlyTransaction) requestPriority() sppb.RequestOptions_Priority {
1248	return t.commitPriority
1249}
1250
1251// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once,
1252// unless one of the following happens:
1253//
1254//     1) Context times out.
1255//     2) An unretryable error (e.g. database not found) occurs.
1256//     3) There is a malformed Mutation object.
1257func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) {
1258	var (
1259		ts time.Time
1260		sh *sessionHandle
1261	)
1262	mPb, err := mutationsProto(ms)
1263	if err != nil {
1264		// Malformed mutation found, just return the error.
1265		return ts, err
1266	}
1267
1268	// Retry-loop for aborted transactions.
1269	// TODO: Replace with generic retryer.
1270	for {
1271		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
1272			// No usable session for doing the commit, take one from pool.
1273			sh, err = t.sp.take(ctx)
1274			if err != nil {
1275				// sessionPool.Take already retries for session
1276				// creations/retrivals.
1277				return ts, err
1278			}
1279			defer sh.recycle()
1280		}
1281		res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
1282			Session: sh.getID(),
1283			Transaction: &sppb.CommitRequest_SingleUseTransaction{
1284				SingleUseTransaction: &sppb.TransactionOptions{
1285					Mode: &sppb.TransactionOptions_ReadWrite_{
1286						ReadWrite: &sppb.TransactionOptions_ReadWrite{},
1287					},
1288				},
1289			},
1290			Mutations:      mPb,
1291			RequestOptions: createRequestOptions(t),
1292		})
1293		if err != nil && !isAbortedErr(err) {
1294			if isSessionNotFoundError(err) {
1295				// Discard the bad session.
1296				sh.destroy()
1297			}
1298			return ts, toSpannerErrorWithCommitInfo(err, true)
1299		} else if err == nil {
1300			if tstamp := res.GetCommitTimestamp(); tstamp != nil {
1301				ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
1302			}
1303			break
1304		}
1305	}
1306	return ts, ToSpannerError(err)
1307}
1308
1309// isAbortedErr returns true if the error indicates that an gRPC call is
1310// aborted on the server side.
1311func isAbortedErr(err error) bool {
1312	if err == nil {
1313		return false
1314	}
1315	if ErrCode(err) == codes.Aborted {
1316		return true
1317	}
1318	return false
1319}
1320