1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	"cloud.google.com/go/internal/trace"
26	vkit "cloud.google.com/go/spanner/apiv1"
27	"github.com/golang/protobuf/proto"
28	"google.golang.org/api/iterator"
29	sppb "google.golang.org/genproto/googleapis/spanner/v1"
30	"google.golang.org/grpc/codes"
31	"google.golang.org/grpc/status"
32)
33
34// transactionID stores a transaction ID which uniquely identifies a transaction
35// in Cloud Spanner.
36type transactionID []byte
37
38// txReadEnv manages a read-transaction environment consisting of a session
39// handle and a transaction selector.
40type txReadEnv interface {
41	// acquire returns a read-transaction environment that can be used to
42	// perform a transactional read.
43	acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error)
44	// sets the transaction's read timestamp
45	setTimestamp(time.Time)
46	// release should be called at the end of every transactional read to deal
47	// with session recycling.
48	release(error)
49}
50
51// txReadOnly contains methods for doing transactional reads.
52type txReadOnly struct {
53	// read-transaction environment for performing transactional read
54	// operations.
55	txReadEnv
56
57	// Atomic. Only needed for DML statements, but used forall.
58	sequenceNumber int64
59
60	// replaceSessionFunc is a function that can be called to replace the
61	// session that is used by the transaction. This function should only be
62	// defined for single-use transactions that can safely be retried on a
63	// different session. All other transactions will set this function to nil.
64	replaceSessionFunc func(ctx context.Context) error
65
66	// sp is the session pool for allocating a session to execute the read-only
67	// transaction. It is set only once during initialization of the
68	// txReadOnly.
69	sp *sessionPool
70	// sh is the sessionHandle allocated from sp.
71	sh *sessionHandle
72
73	// qo provides options for executing a sql query.
74	qo QueryOptions
75
76	// txOpts provides options for a transaction.
77	txOpts TransactionOptions
78}
79
80// TransactionOptions provides options for a transaction.
81type TransactionOptions struct {
82	CommitOptions CommitOptions
83
84	// The transaction tag to use for a read/write transaction.
85	// This tag is automatically included with each statement and the commit
86	// request of a read/write transaction.
87	TransactionTag string
88
89	// CommitPriority is the priority to use for the Commit RPC for the
90	// transaction.
91	CommitPriority sppb.RequestOptions_Priority
92}
93
94func (to *TransactionOptions) requestPriority() sppb.RequestOptions_Priority {
95	return to.CommitPriority
96}
97
98func (to *TransactionOptions) requestTag() string {
99	return ""
100}
101
102// errSessionClosed returns error for using a recycled/destroyed session
103func errSessionClosed(sh *sessionHandle) error {
104	return spannerErrorf(codes.FailedPrecondition,
105		"session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient())
106}
107
108// Read returns a RowIterator for reading multiple rows from the database.
109func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator {
110	return t.ReadWithOptions(ctx, table, keys, columns, nil)
111}
112
113// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}.
114func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) {
115	return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index})
116}
117
118// ReadOptions provides options for reading rows from a database.
119type ReadOptions struct {
120	// The index to use for reading. If non-empty, you can only read columns
121	// that are part of the index key, part of the primary key, or stored in the
122	// index due to a STORING clause in the index definition.
123	Index string
124
125	// The maximum number of rows to read. A limit value less than 1 means no
126	// limit.
127	Limit int
128
129	// Priority is the RPC priority to use for the operation.
130	Priority sppb.RequestOptions_Priority
131
132	// The request tag to use for this request.
133	RequestTag string
134}
135
136// ReadWithOptions returns a RowIterator for reading multiple rows from the
137// database. Pass a ReadOptions to modify the read operation.
138func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) {
139	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read")
140	defer func() { trace.EndSpan(ctx, ri.err) }()
141	var (
142		sh  *sessionHandle
143		ts  *sppb.TransactionSelector
144		err error
145	)
146	kset, err := keys.keySetProto()
147	if err != nil {
148		return &RowIterator{err: err}
149	}
150	if sh, ts, err = t.acquire(ctx); err != nil {
151		return &RowIterator{err: err}
152	}
153	// Cloud Spanner will return "Session not found" on bad sessions.
154	client := sh.getClient()
155	if client == nil {
156		// Might happen if transaction is closed in the middle of a API call.
157		return &RowIterator{err: errSessionClosed(sh)}
158	}
159	index := ""
160	limit := 0
161	prio := sppb.RequestOptions_PRIORITY_UNSPECIFIED
162	requestTag := ""
163	if opts != nil {
164		index = opts.Index
165		if opts.Limit > 0 {
166			limit = opts.Limit
167		}
168		prio = opts.Priority
169		requestTag = opts.RequestTag
170	}
171	return streamWithReplaceSessionFunc(
172		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
173		sh.session.logger,
174		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
175			return client.StreamingRead(ctx,
176				&sppb.ReadRequest{
177					Session:        t.sh.getID(),
178					Transaction:    ts,
179					Table:          table,
180					Index:          index,
181					Columns:        columns,
182					KeySet:         kset,
183					ResumeToken:    resumeToken,
184					Limit:          int64(limit),
185					RequestOptions: createRequestOptions(prio, requestTag, t.txOpts.TransactionTag),
186				})
187		},
188		t.replaceSessionFunc,
189		t.setTimestamp,
190		t.release,
191	)
192}
193
194// errRowNotFound returns error for not being able to read the row identified by
195// key.
196func errRowNotFound(table string, key Key) error {
197	return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key)
198}
199
200// errRowNotFoundByIndex returns error for not being able to read the row by index.
201func errRowNotFoundByIndex(table string, key Key, index string) error {
202	return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
203}
204
205// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index.
206func errMultipleRowsFound(table string, key Key, index string) error {
207	return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
208}
209
210// ReadRow reads a single row from the database.
211//
212// If no row is present with the given key, then ReadRow returns an error where
213// spanner.ErrCode(err) is codes.NotFound.
214func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) {
215	iter := t.Read(ctx, table, key, columns)
216	defer iter.Stop()
217	row, err := iter.Next()
218	switch err {
219	case iterator.Done:
220		return nil, errRowNotFound(table, key)
221	case nil:
222		return row, nil
223	default:
224		return nil, err
225	}
226}
227
228// ReadRowUsingIndex reads a single row from the database using an index.
229//
230// If no row is present with the given index, then ReadRowUsingIndex returns an
231// error where spanner.ErrCode(err) is codes.NotFound.
232//
233// If more than one row received with the given index, then ReadRowUsingIndex
234// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition.
235func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) {
236	iter := t.ReadUsingIndex(ctx, table, index, key, columns)
237	defer iter.Stop()
238	row, err := iter.Next()
239	switch err {
240	case iterator.Done:
241		return nil, errRowNotFoundByIndex(table, key, index)
242	case nil:
243		// If more than one row found, return an error.
244		_, err := iter.Next()
245		switch err {
246		case iterator.Done:
247			return row, nil
248		case nil:
249			return nil, errMultipleRowsFound(table, key, index)
250		default:
251			return nil, err
252		}
253	default:
254		return nil, err
255	}
256}
257
258// QueryOptions provides options for executing a sql query or update statement.
259type QueryOptions struct {
260	Mode    *sppb.ExecuteSqlRequest_QueryMode
261	Options *sppb.ExecuteSqlRequest_QueryOptions
262
263	// Priority is the RPC priority to use for the query/update.
264	Priority sppb.RequestOptions_Priority
265
266	// The request tag to use for this request.
267	RequestTag string
268}
269
270// merge combines two QueryOptions that the input parameter will have higher
271// order of precedence.
272func (qo QueryOptions) merge(opts QueryOptions) QueryOptions {
273	merged := QueryOptions{
274		Mode:       qo.Mode,
275		Options:    &sppb.ExecuteSqlRequest_QueryOptions{},
276		RequestTag: qo.RequestTag,
277		Priority:   qo.Priority,
278	}
279	if opts.Mode != nil {
280		merged.Mode = opts.Mode
281	}
282	if opts.RequestTag != "" {
283		merged.RequestTag = opts.RequestTag
284	}
285	if opts.Priority != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
286		merged.Priority = opts.Priority
287	}
288	proto.Merge(merged.Options, qo.Options)
289	proto.Merge(merged.Options, opts.Options)
290	return merged
291}
292
293func createRequestOptions(prio sppb.RequestOptions_Priority, requestTag, transactionTag string) (ro *sppb.RequestOptions) {
294	ro = &sppb.RequestOptions{}
295	if prio != sppb.RequestOptions_PRIORITY_UNSPECIFIED {
296		ro.Priority = prio
297	}
298	if requestTag != "" {
299		ro.RequestTag = requestTag
300	}
301	if transactionTag != "" {
302		ro.TransactionTag = transactionTag
303	}
304	return ro
305}
306
307// Query executes a query against the database. It returns a RowIterator for
308// retrieving the resulting rows.
309//
310// Query returns only row data, without a query plan or execution statistics.
311// Use QueryWithStats to get rows along with the plan and statistics. Use
312// AnalyzeQuery to get just the plan.
313func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator {
314	mode := sppb.ExecuteSqlRequest_NORMAL
315	return t.query(ctx, statement, QueryOptions{
316		Mode:    &mode,
317		Options: t.qo.Options,
318	})
319}
320
321// QueryWithOptions executes a SQL statment against the database. It returns
322// a RowIterator for retrieving the resulting rows. The sql query execution
323// will be optimized based on the given query options.
324func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, opts QueryOptions) *RowIterator {
325	return t.query(ctx, statement, t.qo.merge(opts))
326}
327
328// QueryWithStats executes a SQL statement against the database. It returns
329// a RowIterator for retrieving the resulting rows. The RowIterator will also
330// be populated with a query plan and execution statistics.
331func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator {
332	mode := sppb.ExecuteSqlRequest_PROFILE
333	return t.query(ctx, statement, QueryOptions{
334		Mode:    &mode,
335		Options: t.qo.Options,
336	})
337}
338
339// AnalyzeQuery returns the query plan for statement.
340func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) {
341	mode := sppb.ExecuteSqlRequest_PLAN
342	iter := t.query(ctx, statement, QueryOptions{
343		Mode:    &mode,
344		Options: t.qo.Options,
345	})
346	defer iter.Stop()
347	for {
348		_, err := iter.Next()
349		if err == iterator.Done {
350			break
351		}
352		if err != nil {
353			return nil, err
354		}
355	}
356	if iter.QueryPlan == nil {
357		return nil, spannerErrorf(codes.Internal, "query plan unavailable")
358	}
359	return iter.QueryPlan, nil
360}
361
362func (t *txReadOnly) query(ctx context.Context, statement Statement, options QueryOptions) (ri *RowIterator) {
363	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query")
364	defer func() { trace.EndSpan(ctx, ri.err) }()
365	req, sh, err := t.prepareExecuteSQL(ctx, statement, options)
366	if err != nil {
367		return &RowIterator{err: err}
368	}
369	client := sh.getClient()
370	return streamWithReplaceSessionFunc(
371		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
372		sh.session.logger,
373		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
374			req.ResumeToken = resumeToken
375			req.Session = t.sh.getID()
376			return client.ExecuteStreamingSql(ctx, req)
377		},
378		t.replaceSessionFunc,
379		t.setTimestamp,
380		t.release)
381}
382
383func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, options QueryOptions) (*sppb.ExecuteSqlRequest, *sessionHandle, error) {
384	sh, ts, err := t.acquire(ctx)
385	if err != nil {
386		return nil, nil, err
387	}
388	// Cloud Spanner will return "Session not found" on bad sessions.
389	sid := sh.getID()
390	if sid == "" {
391		// Might happen if transaction is closed in the middle of a API call.
392		return nil, nil, errSessionClosed(sh)
393	}
394	params, paramTypes, err := stmt.convertParams()
395	if err != nil {
396		return nil, nil, err
397	}
398	mode := sppb.ExecuteSqlRequest_NORMAL
399	if options.Mode != nil {
400		mode = *options.Mode
401	}
402	req := &sppb.ExecuteSqlRequest{
403		Session:        sid,
404		Transaction:    ts,
405		Sql:            stmt.SQL,
406		QueryMode:      mode,
407		Seqno:          atomic.AddInt64(&t.sequenceNumber, 1),
408		Params:         params,
409		ParamTypes:     paramTypes,
410		QueryOptions:   options.Options,
411		RequestOptions: createRequestOptions(options.Priority, options.RequestTag, t.txOpts.TransactionTag),
412	}
413	return req, sh, nil
414}
415
416// txState is the status of a transaction.
417type txState int
418
419const (
420	// transaction is new, waiting to be initialized..
421	txNew txState = iota
422	// transaction is being initialized.
423	txInit
424	// transaction is active and can perform read/write.
425	txActive
426	// transaction is closed, cannot be used anymore.
427	txClosed
428)
429
430// errRtsUnavailable returns error for read transaction's read timestamp being
431// unavailable.
432func errRtsUnavailable() error {
433	return spannerErrorf(codes.Internal, "read timestamp is unavailable")
434}
435
436// errTxClosed returns error for using a closed transaction.
437func errTxClosed() error {
438	return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction")
439}
440
441// errUnexpectedTxState returns error for transaction enters an unexpected state.
442func errUnexpectedTxState(ts txState) error {
443	return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts)
444}
445
446// ReadOnlyTransaction provides a snapshot transaction with guaranteed
447// consistency across reads, but does not allow writes.  Read-only transactions
448// can be configured to read at timestamps in the past.
449//
450// Read-only transactions do not take locks. Instead, they work by choosing a
451// Cloud Spanner timestamp, then executing all reads at that timestamp. Since
452// they do not acquire locks, they do not block concurrent read-write
453// transactions.
454//
455// Unlike locking read-write transactions, read-only transactions never abort.
456// They can fail if the chosen read timestamp is garbage collected; however, the
457// default garbage collection policy is generous enough that most applications
458// do not need to worry about this in practice. See the documentation of
459// TimestampBound for more details.
460//
461// A ReadOnlyTransaction consumes resources on the server until Close is called.
462type ReadOnlyTransaction struct {
463	// mu protects concurrent access to the internal states of ReadOnlyTransaction.
464	mu sync.Mutex
465	// txReadOnly contains methods for performing transactional reads.
466	txReadOnly
467	// singleUse indicates that the transaction can be used for only one read.
468	singleUse bool
469	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
470	// ReadOnlyTransaction.
471	tx transactionID
472	// txReadyOrClosed is for broadcasting that transaction ID has been returned
473	// by Cloud Spanner or that transaction is closed.
474	txReadyOrClosed chan struct{}
475	// state is the current transaction status of the ReadOnly transaction.
476	state txState
477	// rts is the read timestamp returned by transactional reads.
478	rts time.Time
479	// tb is the read staleness bound specification for transactional reads.
480	tb TimestampBound
481}
482
483// errTxInitTimeout returns error for timeout in waiting for initialization of
484// the transaction.
485func errTxInitTimeout() error {
486	return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization")
487}
488
489// getTimestampBound returns the read staleness bound specified for the
490// ReadOnlyTransaction.
491func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound {
492	t.mu.Lock()
493	defer t.mu.Unlock()
494	return t.tb
495}
496
497// begin starts a snapshot read-only Transaction on Cloud Spanner.
498func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
499	var (
500		locked bool
501		tx     transactionID
502		rts    time.Time
503		sh     *sessionHandle
504		err    error
505		res    *sppb.Transaction
506	)
507	defer func() {
508		if !locked {
509			t.mu.Lock()
510			// Not necessary, just to make it clear that t.mu is being held when
511			// locked == true.
512			locked = true
513		}
514		if t.state != txClosed {
515			// Signal other initialization routines.
516			close(t.txReadyOrClosed)
517			t.txReadyOrClosed = make(chan struct{})
518		}
519		t.mu.Unlock()
520		if err != nil && sh != nil {
521			// Got a valid session handle, but failed to initialize transaction=
522			// on Cloud Spanner.
523			if isSessionNotFoundError(err) {
524				sh.destroy()
525			}
526			// If sh.destroy was already executed, this becomes a noop.
527			sh.recycle()
528		}
529	}()
530	// Retry the BeginTransaction call if a 'Session not found' is returned.
531	for {
532		sh, err = t.sp.take(ctx)
533		if err != nil {
534			return err
535		}
536		res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
537			Session: sh.getID(),
538			Options: &sppb.TransactionOptions{
539				Mode: &sppb.TransactionOptions_ReadOnly_{
540					ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
541				},
542			},
543		})
544		if isSessionNotFoundError(err) {
545			sh.destroy()
546			continue
547		} else if err == nil {
548			tx = res.Id
549			if res.ReadTimestamp != nil {
550				rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
551			}
552		} else {
553			err = ToSpannerError(err)
554		}
555		break
556	}
557	t.mu.Lock()
558
559	// defer function will be executed with t.mu being held.
560	locked = true
561
562	// During the execution of t.begin(), t.Close() was invoked.
563	if t.state == txClosed {
564		return errSessionClosed(sh)
565	}
566
567	// If begin() fails, this allows other queries to take over the
568	// initialization.
569	t.tx = nil
570	if err == nil {
571		t.tx = tx
572		t.rts = rts
573		t.sh = sh
574		// State transite to txActive.
575		t.state = txActive
576	}
577	return err
578}
579
580// acquire implements txReadEnv.acquire.
581func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
582	if err := checkNestedTxn(ctx); err != nil {
583		return nil, nil, err
584	}
585	if t.singleUse {
586		return t.acquireSingleUse(ctx)
587	}
588	return t.acquireMultiUse(ctx)
589}
590
591func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
592	t.mu.Lock()
593	defer t.mu.Unlock()
594	switch t.state {
595	case txClosed:
596		// A closed single-use transaction can never be reused.
597		return nil, nil, errTxClosed()
598	case txNew:
599		t.state = txClosed
600		ts := &sppb.TransactionSelector{
601			Selector: &sppb.TransactionSelector_SingleUse{
602				SingleUse: &sppb.TransactionOptions{
603					Mode: &sppb.TransactionOptions_ReadOnly_{
604						ReadOnly: buildTransactionOptionsReadOnly(t.tb, true),
605					},
606				},
607			},
608		}
609		sh, err := t.sp.take(ctx)
610		if err != nil {
611			return nil, nil, err
612		}
613
614		// Install session handle into t, which can be used for readonly
615		// operations later.
616		t.sh = sh
617		return sh, ts, nil
618	}
619	us := t.state
620
621	// SingleUse transaction should only be in either txNew state or txClosed
622	// state.
623	return nil, nil, errUnexpectedTxState(us)
624}
625
626func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
627	for {
628		t.mu.Lock()
629		switch t.state {
630		case txClosed:
631			t.mu.Unlock()
632			return nil, nil, errTxClosed()
633		case txNew:
634			// State transit to txInit so that no further TimestampBound change
635			// is accepted.
636			t.state = txInit
637			t.mu.Unlock()
638			continue
639		case txInit:
640			if t.tx != nil {
641				// Wait for a transaction ID to become ready.
642				txReadyOrClosed := t.txReadyOrClosed
643				t.mu.Unlock()
644				select {
645				case <-txReadyOrClosed:
646					// Need to check transaction state again.
647					continue
648				case <-ctx.Done():
649					// The waiting for initialization is timeout, return error
650					// directly.
651					return nil, nil, errTxInitTimeout()
652				}
653			}
654			// Take the ownership of initializing the transaction.
655			t.tx = transactionID{}
656			t.mu.Unlock()
657			// Begin a read-only transaction.
658			//
659			// TODO: consider adding a transaction option which allow queries to
660			//  initiate transactions by themselves. Note that this option might
661			//  not be always good because the ID of the new transaction won't
662			//  be ready till the query returns some data or completes.
663			if err := t.begin(ctx); err != nil {
664				return nil, nil, err
665			}
666
667			// If t.begin() succeeded, t.state should have been changed to
668			// txActive, so we can just continue here.
669			continue
670		case txActive:
671			sh := t.sh
672			ts := &sppb.TransactionSelector{
673				Selector: &sppb.TransactionSelector_Id{
674					Id: t.tx,
675				},
676			}
677			t.mu.Unlock()
678			return sh, ts, nil
679		}
680		state := t.state
681		t.mu.Unlock()
682		return nil, nil, errUnexpectedTxState(state)
683	}
684}
685
686func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) {
687	t.mu.Lock()
688	defer t.mu.Unlock()
689	if t.rts.IsZero() {
690		t.rts = ts
691	}
692}
693
694// release implements txReadEnv.release.
695func (t *ReadOnlyTransaction) release(err error) {
696	t.mu.Lock()
697	sh := t.sh
698	t.mu.Unlock()
699	if sh != nil { // sh could be nil if t.acquire() fails.
700		if isSessionNotFoundError(err) {
701			sh.destroy()
702		}
703		if t.singleUse {
704			// If session handle is already destroyed, this becomes a noop.
705			sh.recycle()
706		}
707	}
708}
709
710// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads
711// after being closed.
712func (t *ReadOnlyTransaction) Close() {
713	if t.singleUse {
714		return
715	}
716	t.mu.Lock()
717	if t.state != txClosed {
718		t.state = txClosed
719		close(t.txReadyOrClosed)
720	}
721	sh := t.sh
722	t.mu.Unlock()
723	if sh == nil {
724		return
725	}
726	// If session handle is already destroyed, this becomes a noop. If there are
727	// still active queries and if the recycled session is reused before they
728	// complete, Cloud Spanner will cancel them on behalf of the new transaction
729	// on the session.
730	if sh != nil {
731		sh.recycle()
732	}
733}
734
735// Timestamp returns the timestamp chosen to perform reads and queries in this
736// transaction. The value can only be read after some read or query has either
737// returned some data or completed without returning any data.
738func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) {
739	t.mu.Lock()
740	defer t.mu.Unlock()
741	if t.rts.IsZero() {
742		return t.rts, errRtsUnavailable()
743	}
744	return t.rts, nil
745}
746
747// WithTimestampBound specifies the TimestampBound to use for read or query.
748// This can only be used before the first read or query is invoked. Note:
749// bounded staleness is not available with general ReadOnlyTransactions; use a
750// single-use ReadOnlyTransaction instead.
751//
752// The returned value is the ReadOnlyTransaction so calls can be chained.
753func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction {
754	t.mu.Lock()
755	defer t.mu.Unlock()
756	if t.state == txNew {
757		// Only allow to set TimestampBound before the first query.
758		t.tb = tb
759	}
760	return t
761}
762
763// ReadWriteTransaction provides a locking read-write transaction.
764//
765// This type of transaction is the only way to write data into Cloud Spanner;
766// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use
767// transactions internally. These transactions rely on pessimistic locking and,
768// if necessary, two-phase commit. Locking read-write transactions may abort,
769// requiring the application to retry. However, the interface exposed by
770// (*Client).ReadWriteTransaction eliminates the need for applications to write
771// retry loops explicitly.
772//
773// Locking transactions may be used to atomically read-modify-write data
774// anywhere in a database. This type of transaction is externally consistent.
775//
776// Clients should attempt to minimize the amount of time a transaction is
777// active. Faster transactions commit with higher probability and cause less
778// contention. Cloud Spanner attempts to keep read locks active as long as the
779// transaction continues to do reads.  Long periods of inactivity at the client
780// may cause Cloud Spanner to release a transaction's locks and abort it.
781//
782// Reads performed within a transaction acquire locks on the data being
783// read. Writes can only be done at commit time, after all reads have been
784// completed. Conceptually, a read-write transaction consists of zero or more
785// reads or SQL queries followed by a commit.
786//
787// See (*Client).ReadWriteTransaction for an example.
788//
789// Semantics
790//
791// Cloud Spanner can commit the transaction if all read locks it acquired are
792// still valid at commit time, and it is able to acquire write locks for all
793// writes. Cloud Spanner can abort the transaction for any reason. If a commit
794// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has
795// not modified any user data in Cloud Spanner.
796//
797// Unless the transaction commits, Cloud Spanner makes no guarantees about how
798// long the transaction's locks were held for. It is an error to use Cloud
799// Spanner locks for any sort of mutual exclusion other than between Cloud
800// Spanner transactions themselves.
801//
802// Aborted transactions
803//
804// Application code does not need to retry explicitly; RunInTransaction will
805// automatically retry a transaction if an attempt results in an abort. The lock
806// priority of a transaction increases after each prior aborted transaction,
807// meaning that the next attempt has a slightly better chance of success than
808// before.
809//
810// Under some circumstances (e.g., many transactions attempting to modify the
811// same row(s)), a transaction can abort many times in a short period before
812// successfully committing. Thus, it is not a good idea to cap the number of
813// retries a transaction can attempt; instead, it is better to limit the total
814// amount of wall time spent retrying.
815//
816// Idle transactions
817//
818// A transaction is considered idle if it has no outstanding reads or SQL
819// queries and has not started a read or SQL query within the last 10
820// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't
821// hold on to locks indefinitely. In that case, the commit will fail with error
822// ABORTED.
823//
824// If this behavior is undesirable, periodically executing a simple SQL query
825// in the transaction (e.g., SELECT 1) prevents the transaction from becoming
826// idle.
827type ReadWriteTransaction struct {
828	// txReadOnly contains methods for performing transactional reads.
829	txReadOnly
830	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
831	// ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin()
832	// during the initialization of ReadWriteTransaction.
833	tx transactionID
834	// mu protects concurrent access to the internal states of
835	// ReadWriteTransaction.
836	mu sync.Mutex
837	// state is the current transaction status of the read-write transaction.
838	state txState
839	// wb is the set of buffered mutations waiting to be committed.
840	wb []*Mutation
841}
842
843// BufferWrite adds a list of mutations to the set of updates that will be
844// applied when the transaction is committed. It does not actually apply the
845// write until the transaction is committed, so the operation does not block.
846// The effects of the write won't be visible to any reads (including reads done
847// in the same transaction) until the transaction commits.
848//
849// See the example for Client.ReadWriteTransaction.
850func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error {
851	t.mu.Lock()
852	defer t.mu.Unlock()
853	if t.state == txClosed {
854		return errTxClosed()
855	}
856	if t.state != txActive {
857		return errUnexpectedTxState(t.state)
858	}
859	t.wb = append(t.wb, ms...)
860	return nil
861}
862
863// Update executes a DML statement against the database. It returns the number
864// of affected rows. Update returns an error if the statement is a query.
865// However, the query is executed, and any data read will be validated upon
866// commit.
867func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) {
868	mode := sppb.ExecuteSqlRequest_NORMAL
869	return t.update(ctx, stmt, QueryOptions{
870		Mode:    &mode,
871		Options: t.qo.Options,
872	})
873}
874
875// UpdateWithOptions executes a DML statement against the database. It returns
876// the number of affected rows. The given QueryOptions will be used for the
877// execution of this statement.
878func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) {
879	return t.update(ctx, stmt, t.qo.merge(opts))
880}
881
882func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) {
883	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update")
884	defer func() { trace.EndSpan(ctx, err) }()
885	req, sh, err := t.prepareExecuteSQL(ctx, stmt, opts)
886	if err != nil {
887		return 0, err
888	}
889	resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req)
890	if err != nil {
891		return 0, ToSpannerError(err)
892	}
893	if resultSet.Stats == nil {
894		return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL)
895	}
896	return extractRowCount(resultSet.Stats)
897}
898
899// BatchUpdate groups one or more DML statements and sends them to Spanner in a
900// single RPC. This is an efficient way to execute multiple DML statements.
901//
902// A slice of counts is returned, where each count represents the number of
903// affected rows for the given query at the same index. If an error occurs,
904// counts will be returned up to the query that encountered the error.
905func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) {
906	return t.BatchUpdateWithOptions(ctx, stmts, QueryOptions{})
907}
908
909// BatchUpdateWithOptions groups one or more DML statements and sends them to
910// Spanner in a single RPC. This is an efficient way to execute multiple DML
911// statements.
912//
913// A slice of counts is returned, where each count represents the number of
914// affected rows for the given query at the same index. If an error occurs,
915// counts will be returned up to the query that encountered the error.
916//
917// The request tag and priority given in the QueryOptions are included with the
918// RPC. Any other options that are set in the QueryOptions struct are ignored.
919func (t *ReadWriteTransaction) BatchUpdateWithOptions(ctx context.Context, stmts []Statement, opts QueryOptions) (_ []int64, err error) {
920	return t.batchUpdateWithOptions(ctx, stmts, t.qo.merge(opts))
921}
922
923func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts []Statement, opts QueryOptions) (_ []int64, err error) {
924	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate")
925	defer func() { trace.EndSpan(ctx, err) }()
926
927	sh, ts, err := t.acquire(ctx)
928	if err != nil {
929		return nil, err
930	}
931	// Cloud Spanner will return "Session not found" on bad sessions.
932	sid := sh.getID()
933	if sid == "" {
934		// Might happen if transaction is closed in the middle of a API call.
935		return nil, errSessionClosed(sh)
936	}
937
938	var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement
939	for _, st := range stmts {
940		params, paramTypes, err := st.convertParams()
941		if err != nil {
942			return nil, err
943		}
944		sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{
945			Sql:        st.SQL,
946			Params:     params,
947			ParamTypes: paramTypes,
948		})
949	}
950
951	resp, err := sh.getClient().ExecuteBatchDml(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.ExecuteBatchDmlRequest{
952		Session:        sh.getID(),
953		Transaction:    ts,
954		Statements:     sppbStmts,
955		Seqno:          atomic.AddInt64(&t.sequenceNumber, 1),
956		RequestOptions: createRequestOptions(opts.Priority, opts.RequestTag, t.txOpts.TransactionTag),
957	})
958	if err != nil {
959		return nil, ToSpannerError(err)
960	}
961
962	var counts []int64
963	for _, rs := range resp.ResultSets {
964		count, err := extractRowCount(rs.Stats)
965		if err != nil {
966			return nil, err
967		}
968		counts = append(counts, count)
969	}
970	if resp.Status != nil && resp.Status.Code != 0 {
971		return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message)
972	}
973	return counts, nil
974}
975
976// acquire implements txReadEnv.acquire.
977func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
978	ts := &sppb.TransactionSelector{
979		Selector: &sppb.TransactionSelector_Id{
980			Id: t.tx,
981		},
982	}
983	t.mu.Lock()
984	defer t.mu.Unlock()
985	switch t.state {
986	case txClosed:
987		return nil, nil, errTxClosed()
988	case txActive:
989		return t.sh, ts, nil
990	}
991	return nil, nil, errUnexpectedTxState(t.state)
992}
993
994// release implements txReadEnv.release.
995func (t *ReadWriteTransaction) release(err error) {
996	t.mu.Lock()
997	sh := t.sh
998	t.mu.Unlock()
999	if sh != nil && isSessionNotFoundError(err) {
1000		sh.destroy()
1001	}
1002}
1003
1004func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) {
1005	res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
1006		Session: sid,
1007		Options: &sppb.TransactionOptions{
1008			Mode: &sppb.TransactionOptions_ReadWrite_{
1009				ReadWrite: &sppb.TransactionOptions_ReadWrite{},
1010			},
1011		},
1012	})
1013	if err != nil {
1014		return nil, err
1015	}
1016	if res.Id == nil {
1017		return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.")
1018	}
1019	return res.Id, nil
1020}
1021
1022// begin starts a read-write transacton on Cloud Spanner, it is always called
1023// before any of the public APIs.
1024func (t *ReadWriteTransaction) begin(ctx context.Context) error {
1025	if t.tx != nil {
1026		t.state = txActive
1027		return nil
1028	}
1029	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
1030	if err == nil {
1031		t.tx = tx
1032		t.state = txActive
1033		return nil
1034	}
1035	if isSessionNotFoundError(err) {
1036		t.sh.destroy()
1037	}
1038	return err
1039}
1040
1041// CommitResponse provides a response of a transaction commit in a database.
1042type CommitResponse struct {
1043	// CommitTs is the commit time for a transaction.
1044	CommitTs time.Time
1045	// CommitStats is the commit statistics for a transaction.
1046	CommitStats *sppb.CommitResponse_CommitStats
1047}
1048
1049// CommitOptions provides options for commiting a transaction in a database.
1050type CommitOptions struct {
1051	ReturnCommitStats bool
1052}
1053
1054// commit tries to commit a readwrite transaction to Cloud Spanner. It also
1055// returns the commit response for the transactions.
1056func (t *ReadWriteTransaction) commit(ctx context.Context, options CommitOptions) (CommitResponse, error) {
1057	resp := CommitResponse{}
1058	t.mu.Lock()
1059	t.state = txClosed // No further operations after commit.
1060	mPb, err := mutationsProto(t.wb)
1061	t.mu.Unlock()
1062	if err != nil {
1063		return resp, err
1064	}
1065
1066	// In case that sessionHandle was destroyed but transaction body fails to
1067	// report it.
1068	sid, client := t.sh.getID(), t.sh.getClient()
1069	if sid == "" || client == nil {
1070		return resp, errSessionClosed(t.sh)
1071	}
1072
1073	res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{
1074		Session: sid,
1075		Transaction: &sppb.CommitRequest_TransactionId{
1076			TransactionId: t.tx,
1077		},
1078		RequestOptions:    createRequestOptions(t.txOpts.CommitPriority, "", t.txOpts.TransactionTag),
1079		Mutations:         mPb,
1080		ReturnCommitStats: options.ReturnCommitStats,
1081	})
1082	if e != nil {
1083		return resp, toSpannerErrorWithCommitInfo(e, true)
1084	}
1085	if tstamp := res.GetCommitTimestamp(); tstamp != nil {
1086		resp.CommitTs = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
1087	}
1088	if options.ReturnCommitStats {
1089		resp.CommitStats = res.CommitStats
1090	}
1091	if isSessionNotFoundError(err) {
1092		t.sh.destroy()
1093	}
1094	return resp, err
1095}
1096
1097// rollback is called when a commit is aborted or the transaction body runs
1098// into error.
1099func (t *ReadWriteTransaction) rollback(ctx context.Context) {
1100	t.mu.Lock()
1101	// Forbid further operations on rollbacked transaction.
1102	t.state = txClosed
1103	t.mu.Unlock()
1104	// In case that sessionHandle was destroyed but transaction body fails to
1105	// report it.
1106	sid, client := t.sh.getID(), t.sh.getClient()
1107	if sid == "" || client == nil {
1108		return
1109	}
1110	err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{
1111		Session:       sid,
1112		TransactionId: t.tx,
1113	})
1114	if isSessionNotFoundError(err) {
1115		t.sh.destroy()
1116	}
1117}
1118
1119// runInTransaction executes f under a read-write transaction context.
1120func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (CommitResponse, error) {
1121	var (
1122		resp            CommitResponse
1123		err             error
1124		errDuringCommit bool
1125	)
1126	if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
1127		// Try to commit if transaction body returns no error.
1128		resp, err = t.commit(ctx, t.txOpts.CommitOptions)
1129		errDuringCommit = err != nil
1130	}
1131	if err != nil {
1132		if isAbortedErr(err) {
1133			// Retry the transaction using the same session on ABORT error.
1134			// Cloud Spanner will create the new transaction with the previous
1135			// one's wound-wait priority.
1136			return resp, err
1137		}
1138		if isSessionNotFoundError(err) {
1139			t.sh.destroy()
1140			return resp, err
1141		}
1142		// Rollback the transaction unless the error occurred during the
1143		// commit. Executing a rollback after a commit has failed will
1144		// otherwise cause an error. Note that transient errors, such as
1145		// UNAVAILABLE, are already handled in the gRPC layer and do not show
1146		// up here. Context errors (deadline exceeded / canceled) during
1147		// commits are also not rolled back.
1148		if !errDuringCommit {
1149			t.rollback(ctx)
1150		}
1151		return resp, err
1152	}
1153	// err == nil, return commit response.
1154	return resp, nil
1155}
1156
1157// ReadWriteStmtBasedTransaction provides a wrapper of ReadWriteTransaction in
1158// order to run a read-write transaction in a statement-based way.
1159//
1160// This struct is returned by NewReadWriteStmtBasedTransaction and contains
1161// Commit() and Rollback() methods to end a transaction.
1162type ReadWriteStmtBasedTransaction struct {
1163	// ReadWriteTransaction contains methods for performing transactional reads.
1164	ReadWriteTransaction
1165
1166	options TransactionOptions
1167}
1168
1169// NewReadWriteStmtBasedTransaction starts a read-write transaction. Commit() or
1170// Rollback() must be called to end a transaction. If Commit() or Rollback() is
1171// not called, the session that is used by the transaction will not be returned
1172// to the pool and cause a session leak.
1173//
1174// This method should only be used when manual error handling and retry
1175// management is needed. Cloud Spanner may abort a read/write transaction at any
1176// moment, and each statement that is executed on the transaction should be
1177// checked for an Aborted error, including queries and read operations.
1178//
1179// For most use cases, client.ReadWriteTransaction should be used, as it will
1180// handle all Aborted and 'Session not found' errors automatically.
1181func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWriteStmtBasedTransaction, error) {
1182	return NewReadWriteStmtBasedTransactionWithOptions(ctx, c, TransactionOptions{})
1183}
1184
1185// NewReadWriteStmtBasedTransactionWithOptions starts a read-write transaction
1186// with configurable options. Commit() or Rollback() must be called to end a
1187// transaction. If Commit() or Rollback() is not called, the session that is
1188// used by the transaction will not be returned to the pool and cause a session
1189// leak.
1190//
1191// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of
1192// NewReadWriteStmtBasedTransaction.
1193func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
1194	var (
1195		sh  *sessionHandle
1196		err error
1197		t   *ReadWriteStmtBasedTransaction
1198	)
1199	sh, err = c.idleSessions.takeWriteSession(ctx)
1200	if err != nil {
1201		// If session retrieval fails, just fail the transaction.
1202		return nil, err
1203	}
1204	t = &ReadWriteStmtBasedTransaction{
1205		ReadWriteTransaction: ReadWriteTransaction{
1206			tx: sh.getTransactionID(),
1207		},
1208	}
1209	t.txReadOnly.sh = sh
1210	t.txReadOnly.txReadEnv = t
1211	t.txReadOnly.qo = c.qo
1212	t.txOpts = options
1213
1214	if err = t.begin(ctx); err != nil {
1215		if sh != nil {
1216			sh.recycle()
1217		}
1218		return nil, err
1219	}
1220	return t, err
1221}
1222
1223// Commit tries to commit a readwrite transaction to Cloud Spanner. It also
1224// returns the commit timestamp for the transactions.
1225func (t *ReadWriteStmtBasedTransaction) Commit(ctx context.Context) (time.Time, error) {
1226	resp, err := t.CommitWithReturnResp(ctx)
1227	return resp.CommitTs, err
1228}
1229
1230// CommitWithReturnResp tries to commit a readwrite transaction. It also returns
1231// the commit timestamp and stats for the transactions.
1232func (t *ReadWriteStmtBasedTransaction) CommitWithReturnResp(ctx context.Context) (CommitResponse, error) {
1233	resp, err := t.commit(ctx, t.txOpts.CommitOptions)
1234	// Rolling back an aborted transaction is not necessary.
1235	if err != nil && status.Code(err) != codes.Aborted {
1236		t.rollback(ctx)
1237	}
1238	if t.sh != nil {
1239		t.sh.recycle()
1240	}
1241	return resp, err
1242}
1243
1244// Rollback is called to cancel the ongoing transaction that has not been
1245// committed yet.
1246func (t *ReadWriteStmtBasedTransaction) Rollback(ctx context.Context) {
1247	t.rollback(ctx)
1248	if t.sh != nil {
1249		t.sh.recycle()
1250	}
1251}
1252
1253// writeOnlyTransaction provides the most efficient way of doing write-only
1254// transactions. It essentially does blind writes to Cloud Spanner.
1255type writeOnlyTransaction struct {
1256	// sp is the session pool which writeOnlyTransaction uses to get Cloud
1257	// Spanner sessions for blind writes.
1258	sp *sessionPool
1259	// transactionTag is the tag that will be included with the CommitRequest
1260	// of the write-only transaction.
1261	transactionTag string
1262	// commitPriority is the RPC priority to use for the commit operation.
1263	commitPriority sppb.RequestOptions_Priority
1264}
1265
1266// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once,
1267// unless one of the following happens:
1268//
1269//     1) Context times out.
1270//     2) An unretryable error (e.g. database not found) occurs.
1271//     3) There is a malformed Mutation object.
1272func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) {
1273	var (
1274		ts time.Time
1275		sh *sessionHandle
1276	)
1277	mPb, err := mutationsProto(ms)
1278	if err != nil {
1279		// Malformed mutation found, just return the error.
1280		return ts, err
1281	}
1282
1283	// Retry-loop for aborted transactions.
1284	// TODO: Replace with generic retryer.
1285	for {
1286		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
1287			// No usable session for doing the commit, take one from pool.
1288			sh, err = t.sp.take(ctx)
1289			if err != nil {
1290				// sessionPool.Take already retries for session
1291				// creations/retrivals.
1292				return ts, err
1293			}
1294			defer sh.recycle()
1295		}
1296		res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
1297			Session: sh.getID(),
1298			Transaction: &sppb.CommitRequest_SingleUseTransaction{
1299				SingleUseTransaction: &sppb.TransactionOptions{
1300					Mode: &sppb.TransactionOptions_ReadWrite_{
1301						ReadWrite: &sppb.TransactionOptions_ReadWrite{},
1302					},
1303				},
1304			},
1305			Mutations:      mPb,
1306			RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag),
1307		})
1308		if err != nil && !isAbortedErr(err) {
1309			if isSessionNotFoundError(err) {
1310				// Discard the bad session.
1311				sh.destroy()
1312			}
1313			return ts, toSpannerErrorWithCommitInfo(err, true)
1314		} else if err == nil {
1315			if tstamp := res.GetCommitTimestamp(); tstamp != nil {
1316				ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
1317			}
1318			break
1319		}
1320	}
1321	return ts, ToSpannerError(err)
1322}
1323
1324// isAbortedErr returns true if the error indicates that an gRPC call is
1325// aborted on the server side.
1326func isAbortedErr(err error) bool {
1327	if err == nil {
1328		return false
1329	}
1330	if ErrCode(err) == codes.Aborted {
1331		return true
1332	}
1333	return false
1334}
1335