1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	"cloud.google.com/go/internal/trace"
26	vkit "cloud.google.com/go/spanner/apiv1"
27	"google.golang.org/api/iterator"
28	sppb "google.golang.org/genproto/googleapis/spanner/v1"
29	"google.golang.org/grpc/codes"
30)
31
32// transactionID stores a transaction ID which uniquely identifies a transaction
33// in Cloud Spanner.
34type transactionID []byte
35
36// txReadEnv manages a read-transaction environment consisting of a session
37// handle and a transaction selector.
38type txReadEnv interface {
39	// acquire returns a read-transaction environment that can be used to
40	// perform a transactional read.
41	acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error)
42	// sets the transaction's read timestamp
43	setTimestamp(time.Time)
44	// release should be called at the end of every transactional read to deal
45	// with session recycling.
46	release(error)
47}
48
49// txReadOnly contains methods for doing transactional reads.
50type txReadOnly struct {
51	// read-transaction environment for performing transactional read
52	// operations.
53	txReadEnv
54
55	// Atomic. Only needed for DML statements, but used forall.
56	sequenceNumber int64
57
58	// replaceSessionFunc is a function that can be called to replace the
59	// session that is used by the transaction. This function should only be
60	// defined for single-use transactions that can safely be retried on a
61	// different session. All other transactions will set this function to nil.
62	replaceSessionFunc func(ctx context.Context) error
63
64	// sp is the session pool for allocating a session to execute the read-only
65	// transaction. It is set only once during initialization of the
66	// txReadOnly.
67	sp *sessionPool
68	// sh is the sessionHandle allocated from sp.
69	sh *sessionHandle
70
71	// qo provides options for executing a sql query.
72	qo QueryOptions
73}
74
75// errSessionClosed returns error for using a recycled/destroyed session
76func errSessionClosed(sh *sessionHandle) error {
77	return spannerErrorf(codes.FailedPrecondition,
78		"session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient())
79}
80
81// Read returns a RowIterator for reading multiple rows from the database.
82func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator {
83	return t.ReadWithOptions(ctx, table, keys, columns, nil)
84}
85
86// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}.
87func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) {
88	return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index})
89}
90
91// ReadOptions provides options for reading rows from a database.
92type ReadOptions struct {
93	// The index to use for reading. If non-empty, you can only read columns
94	// that are part of the index key, part of the primary key, or stored in the
95	// index due to a STORING clause in the index definition.
96	Index string
97
98	// The maximum number of rows to read. A limit value less than 1 means no
99	// limit.
100	Limit int
101}
102
103// ReadWithOptions returns a RowIterator for reading multiple rows from the
104// database. Pass a ReadOptions to modify the read operation.
105func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) {
106	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read")
107	defer func() { trace.EndSpan(ctx, ri.err) }()
108	var (
109		sh  *sessionHandle
110		ts  *sppb.TransactionSelector
111		err error
112	)
113	kset, err := keys.keySetProto()
114	if err != nil {
115		return &RowIterator{err: err}
116	}
117	if sh, ts, err = t.acquire(ctx); err != nil {
118		return &RowIterator{err: err}
119	}
120	// Cloud Spanner will return "Session not found" on bad sessions.
121	sid, client := sh.getID(), sh.getClient()
122	if sid == "" || client == nil {
123		// Might happen if transaction is closed in the middle of a API call.
124		return &RowIterator{err: errSessionClosed(sh)}
125	}
126	index := ""
127	limit := 0
128	if opts != nil {
129		index = opts.Index
130		if opts.Limit > 0 {
131			limit = opts.Limit
132		}
133	}
134	return stream(
135		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
136		sh.session.logger,
137		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
138			return client.StreamingRead(ctx,
139				&sppb.ReadRequest{
140					Session:     sid,
141					Transaction: ts,
142					Table:       table,
143					Index:       index,
144					Columns:     columns,
145					KeySet:      kset,
146					ResumeToken: resumeToken,
147					Limit:       int64(limit),
148				})
149		},
150		t.setTimestamp,
151		t.release,
152	)
153}
154
155// errRowNotFound returns error for not being able to read the row identified by
156// key.
157func errRowNotFound(table string, key Key) error {
158	return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key)
159}
160
161// errRowNotFoundByIndex returns error for not being able to read the row by index.
162func errRowNotFoundByIndex(table string, key Key, index string) error {
163	return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
164}
165
166// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index.
167func errMultipleRowsFound(table string, key Key, index string) error {
168	return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
169}
170
171// ReadRow reads a single row from the database.
172//
173// If no row is present with the given key, then ReadRow returns an error where
174// spanner.ErrCode(err) is codes.NotFound.
175func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) {
176	iter := t.Read(ctx, table, key, columns)
177	defer iter.Stop()
178	row, err := iter.Next()
179	switch err {
180	case iterator.Done:
181		return nil, errRowNotFound(table, key)
182	case nil:
183		return row, nil
184	default:
185		return nil, err
186	}
187}
188
189// ReadRowUsingIndex reads a single row from the database using an index.
190//
191// If no row is present with the given index, then ReadRowUsingIndex returns an
192// error where spanner.ErrCode(err) is codes.NotFound.
193//
194// If more than one row received with the given index, then ReadRowUsingIndex
195// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition.
196func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) {
197	iter := t.ReadUsingIndex(ctx, table, index, key, columns)
198	defer iter.Stop()
199	row, err := iter.Next()
200	switch err {
201	case iterator.Done:
202		return nil, errRowNotFoundByIndex(table, key, index)
203	case nil:
204		// If more than one row found, return an error.
205		_, err := iter.Next()
206		switch err {
207		case iterator.Done:
208			return row, nil
209		case nil:
210			return nil, errMultipleRowsFound(table, key, index)
211		default:
212			return nil, err
213		}
214	default:
215		return nil, err
216	}
217}
218
219// QueryOptions provides options for executing a sql query from a database.
220type QueryOptions struct {
221	Mode    *sppb.ExecuteSqlRequest_QueryMode
222	Options *sppb.ExecuteSqlRequest_QueryOptions
223}
224
225// merge combines two QueryOptions that the input parameter will have higher
226// order of precedence.
227func (qo QueryOptions) merge(opts QueryOptions) QueryOptions {
228	merged := QueryOptions{
229		Mode:    qo.Mode,
230		Options: &sppb.ExecuteSqlRequest_QueryOptions{},
231	}
232	if opts.Mode != nil {
233		merged.Mode = opts.Mode
234	}
235	merged.Options.XXX_Merge(qo.Options)
236	merged.Options.XXX_Merge(opts.Options)
237	return merged
238}
239
240// Query executes a query against the database. It returns a RowIterator for
241// retrieving the resulting rows.
242//
243// Query returns only row data, without a query plan or execution statistics.
244// Use QueryWithStats to get rows along with the plan and statistics. Use
245// AnalyzeQuery to get just the plan.
246func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator {
247	mode := sppb.ExecuteSqlRequest_NORMAL
248	return t.query(ctx, statement, QueryOptions{
249		Mode:    &mode,
250		Options: t.qo.Options,
251	})
252}
253
254// QueryWithOptions executes a SQL statment against the database. It returns
255// a RowIterator for retrieving the resulting rows. The sql query execution
256// will be optimized based on the given query options.
257func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, opts QueryOptions) *RowIterator {
258	return t.query(ctx, statement, t.qo.merge(opts))
259}
260
261// QueryWithStats executes a SQL statement against the database. It returns
262// a RowIterator for retrieving the resulting rows. The RowIterator will also
263// be populated with a query plan and execution statistics.
264func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator {
265	mode := sppb.ExecuteSqlRequest_PROFILE
266	return t.query(ctx, statement, QueryOptions{
267		Mode:    &mode,
268		Options: t.qo.Options,
269	})
270}
271
272// AnalyzeQuery returns the query plan for statement.
273func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) {
274	mode := sppb.ExecuteSqlRequest_PLAN
275	iter := t.query(ctx, statement, QueryOptions{
276		Mode:    &mode,
277		Options: t.qo.Options,
278	})
279	defer iter.Stop()
280	for {
281		_, err := iter.Next()
282		if err == iterator.Done {
283			break
284		}
285		if err != nil {
286			return nil, err
287		}
288	}
289	if iter.QueryPlan == nil {
290		return nil, spannerErrorf(codes.Internal, "query plan unavailable")
291	}
292	return iter.QueryPlan, nil
293}
294
295func (t *txReadOnly) query(ctx context.Context, statement Statement, options QueryOptions) (ri *RowIterator) {
296	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query")
297	defer func() { trace.EndSpan(ctx, ri.err) }()
298	req, sh, err := t.prepareExecuteSQL(ctx, statement, options)
299	if err != nil {
300		return &RowIterator{err: err}
301	}
302	client := sh.getClient()
303	return streamWithReplaceSessionFunc(
304		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
305		sh.session.logger,
306		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
307			req.ResumeToken = resumeToken
308			req.Session = t.sh.getID()
309			return client.ExecuteStreamingSql(ctx, req)
310		},
311		t.replaceSessionFunc,
312		t.setTimestamp,
313		t.release)
314}
315
316func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, options QueryOptions) (*sppb.ExecuteSqlRequest, *sessionHandle, error) {
317	sh, ts, err := t.acquire(ctx)
318	if err != nil {
319		return nil, nil, err
320	}
321	// Cloud Spanner will return "Session not found" on bad sessions.
322	sid := sh.getID()
323	if sid == "" {
324		// Might happen if transaction is closed in the middle of a API call.
325		return nil, nil, errSessionClosed(sh)
326	}
327	params, paramTypes, err := stmt.convertParams()
328	if err != nil {
329		return nil, nil, err
330	}
331	mode := sppb.ExecuteSqlRequest_NORMAL
332	if options.Mode != nil {
333		mode = *options.Mode
334	}
335	req := &sppb.ExecuteSqlRequest{
336		Session:      sid,
337		Transaction:  ts,
338		Sql:          stmt.SQL,
339		QueryMode:    mode,
340		Seqno:        atomic.AddInt64(&t.sequenceNumber, 1),
341		Params:       params,
342		ParamTypes:   paramTypes,
343		QueryOptions: options.Options,
344	}
345	return req, sh, nil
346}
347
348// txState is the status of a transaction.
349type txState int
350
351const (
352	// transaction is new, waiting to be initialized..
353	txNew txState = iota
354	// transaction is being initialized.
355	txInit
356	// transaction is active and can perform read/write.
357	txActive
358	// transaction is closed, cannot be used anymore.
359	txClosed
360)
361
362// errRtsUnavailable returns error for read transaction's read timestamp being
363// unavailable.
364func errRtsUnavailable() error {
365	return spannerErrorf(codes.Internal, "read timestamp is unavailable")
366}
367
368// errTxClosed returns error for using a closed transaction.
369func errTxClosed() error {
370	return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction")
371}
372
373// errUnexpectedTxState returns error for transaction enters an unexpected state.
374func errUnexpectedTxState(ts txState) error {
375	return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts)
376}
377
378// ReadOnlyTransaction provides a snapshot transaction with guaranteed
379// consistency across reads, but does not allow writes.  Read-only transactions
380// can be configured to read at timestamps in the past.
381//
382// Read-only transactions do not take locks. Instead, they work by choosing a
383// Cloud Spanner timestamp, then executing all reads at that timestamp. Since
384// they do not acquire locks, they do not block concurrent read-write
385// transactions.
386//
387// Unlike locking read-write transactions, read-only transactions never abort.
388// They can fail if the chosen read timestamp is garbage collected; however, the
389// default garbage collection policy is generous enough that most applications
390// do not need to worry about this in practice. See the documentation of
391// TimestampBound for more details.
392//
393// A ReadOnlyTransaction consumes resources on the server until Close is called.
394type ReadOnlyTransaction struct {
395	// mu protects concurrent access to the internal states of ReadOnlyTransaction.
396	mu sync.Mutex
397	// txReadOnly contains methods for performing transactional reads.
398	txReadOnly
399	// singleUse indicates that the transaction can be used for only one read.
400	singleUse bool
401	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
402	// ReadOnlyTransaction.
403	tx transactionID
404	// txReadyOrClosed is for broadcasting that transaction ID has been returned
405	// by Cloud Spanner or that transaction is closed.
406	txReadyOrClosed chan struct{}
407	// state is the current transaction status of the ReadOnly transaction.
408	state txState
409	// rts is the read timestamp returned by transactional reads.
410	rts time.Time
411	// tb is the read staleness bound specification for transactional reads.
412	tb TimestampBound
413}
414
415// errTxInitTimeout returns error for timeout in waiting for initialization of
416// the transaction.
417func errTxInitTimeout() error {
418	return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization")
419}
420
421// getTimestampBound returns the read staleness bound specified for the
422// ReadOnlyTransaction.
423func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound {
424	t.mu.Lock()
425	defer t.mu.Unlock()
426	return t.tb
427}
428
429// begin starts a snapshot read-only Transaction on Cloud Spanner.
430func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
431	var (
432		locked bool
433		tx     transactionID
434		rts    time.Time
435		sh     *sessionHandle
436		err    error
437		res    *sppb.Transaction
438	)
439	defer func() {
440		if !locked {
441			t.mu.Lock()
442			// Not necessary, just to make it clear that t.mu is being held when
443			// locked == true.
444			locked = true
445		}
446		if t.state != txClosed {
447			// Signal other initialization routines.
448			close(t.txReadyOrClosed)
449			t.txReadyOrClosed = make(chan struct{})
450		}
451		t.mu.Unlock()
452		if err != nil && sh != nil {
453			// Got a valid session handle, but failed to initialize transaction=
454			// on Cloud Spanner.
455			if isSessionNotFoundError(err) {
456				sh.destroy()
457			}
458			// If sh.destroy was already executed, this becomes a noop.
459			sh.recycle()
460		}
461	}()
462	// Retry the BeginTransaction call if a 'Session not found' is returned.
463	for {
464		sh, err = t.sp.take(ctx)
465		if err != nil {
466			return err
467		}
468		res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
469			Session: sh.getID(),
470			Options: &sppb.TransactionOptions{
471				Mode: &sppb.TransactionOptions_ReadOnly_{
472					ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
473				},
474			},
475		})
476		if isSessionNotFoundError(err) {
477			sh.destroy()
478			continue
479		} else if err == nil {
480			tx = res.Id
481			if res.ReadTimestamp != nil {
482				rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
483			}
484		} else {
485			err = toSpannerError(err)
486		}
487		break
488	}
489	t.mu.Lock()
490
491	// defer function will be executed with t.mu being held.
492	locked = true
493
494	// During the execution of t.begin(), t.Close() was invoked.
495	if t.state == txClosed {
496		return errSessionClosed(sh)
497	}
498
499	// If begin() fails, this allows other queries to take over the
500	// initialization.
501	t.tx = nil
502	if err == nil {
503		t.tx = tx
504		t.rts = rts
505		t.sh = sh
506		// State transite to txActive.
507		t.state = txActive
508	}
509	return err
510}
511
512// acquire implements txReadEnv.acquire.
513func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
514	if err := checkNestedTxn(ctx); err != nil {
515		return nil, nil, err
516	}
517	if t.singleUse {
518		return t.acquireSingleUse(ctx)
519	}
520	return t.acquireMultiUse(ctx)
521}
522
523func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
524	t.mu.Lock()
525	defer t.mu.Unlock()
526	switch t.state {
527	case txClosed:
528		// A closed single-use transaction can never be reused.
529		return nil, nil, errTxClosed()
530	case txNew:
531		t.state = txClosed
532		ts := &sppb.TransactionSelector{
533			Selector: &sppb.TransactionSelector_SingleUse{
534				SingleUse: &sppb.TransactionOptions{
535					Mode: &sppb.TransactionOptions_ReadOnly_{
536						ReadOnly: buildTransactionOptionsReadOnly(t.tb, true),
537					},
538				},
539			},
540		}
541		sh, err := t.sp.take(ctx)
542		if err != nil {
543			return nil, nil, err
544		}
545
546		// Install session handle into t, which can be used for readonly
547		// operations later.
548		t.sh = sh
549		return sh, ts, nil
550	}
551	us := t.state
552
553	// SingleUse transaction should only be in either txNew state or txClosed
554	// state.
555	return nil, nil, errUnexpectedTxState(us)
556}
557
558func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
559	for {
560		t.mu.Lock()
561		switch t.state {
562		case txClosed:
563			t.mu.Unlock()
564			return nil, nil, errTxClosed()
565		case txNew:
566			// State transit to txInit so that no further TimestampBound change
567			// is accepted.
568			t.state = txInit
569			t.mu.Unlock()
570			continue
571		case txInit:
572			if t.tx != nil {
573				// Wait for a transaction ID to become ready.
574				txReadyOrClosed := t.txReadyOrClosed
575				t.mu.Unlock()
576				select {
577				case <-txReadyOrClosed:
578					// Need to check transaction state again.
579					continue
580				case <-ctx.Done():
581					// The waiting for initialization is timeout, return error
582					// directly.
583					return nil, nil, errTxInitTimeout()
584				}
585			}
586			// Take the ownership of initializing the transaction.
587			t.tx = transactionID{}
588			t.mu.Unlock()
589			// Begin a read-only transaction.
590			//
591			// TODO: consider adding a transaction option which allow queries to
592			//  initiate transactions by themselves. Note that this option might
593			//  not be always good because the ID of the new transaction won't
594			//  be ready till the query returns some data or completes.
595			if err := t.begin(ctx); err != nil {
596				return nil, nil, err
597			}
598
599			// If t.begin() succeeded, t.state should have been changed to
600			// txActive, so we can just continue here.
601			continue
602		case txActive:
603			sh := t.sh
604			ts := &sppb.TransactionSelector{
605				Selector: &sppb.TransactionSelector_Id{
606					Id: t.tx,
607				},
608			}
609			t.mu.Unlock()
610			return sh, ts, nil
611		}
612		state := t.state
613		t.mu.Unlock()
614		return nil, nil, errUnexpectedTxState(state)
615	}
616}
617
618func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) {
619	t.mu.Lock()
620	defer t.mu.Unlock()
621	if t.rts.IsZero() {
622		t.rts = ts
623	}
624}
625
626// release implements txReadEnv.release.
627func (t *ReadOnlyTransaction) release(err error) {
628	t.mu.Lock()
629	sh := t.sh
630	t.mu.Unlock()
631	if sh != nil { // sh could be nil if t.acquire() fails.
632		if isSessionNotFoundError(err) {
633			sh.destroy()
634		}
635		if t.singleUse {
636			// If session handle is already destroyed, this becomes a noop.
637			sh.recycle()
638		}
639	}
640}
641
642// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads
643// after being closed.
644func (t *ReadOnlyTransaction) Close() {
645	if t.singleUse {
646		return
647	}
648	t.mu.Lock()
649	if t.state != txClosed {
650		t.state = txClosed
651		close(t.txReadyOrClosed)
652	}
653	sh := t.sh
654	t.mu.Unlock()
655	if sh == nil {
656		return
657	}
658	// If session handle is already destroyed, this becomes a noop. If there are
659	// still active queries and if the recycled session is reused before they
660	// complete, Cloud Spanner will cancel them on behalf of the new transaction
661	// on the session.
662	if sh != nil {
663		sh.recycle()
664	}
665}
666
667// Timestamp returns the timestamp chosen to perform reads and queries in this
668// transaction. The value can only be read after some read or query has either
669// returned some data or completed without returning any data.
670func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) {
671	t.mu.Lock()
672	defer t.mu.Unlock()
673	if t.rts.IsZero() {
674		return t.rts, errRtsUnavailable()
675	}
676	return t.rts, nil
677}
678
679// WithTimestampBound specifies the TimestampBound to use for read or query.
680// This can only be used before the first read or query is invoked. Note:
681// bounded staleness is not available with general ReadOnlyTransactions; use a
682// single-use ReadOnlyTransaction instead.
683//
684// The returned value is the ReadOnlyTransaction so calls can be chained.
685func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction {
686	t.mu.Lock()
687	defer t.mu.Unlock()
688	if t.state == txNew {
689		// Only allow to set TimestampBound before the first query.
690		t.tb = tb
691	}
692	return t
693}
694
695// ReadWriteTransaction provides a locking read-write transaction.
696//
697// This type of transaction is the only way to write data into Cloud Spanner;
698// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use
699// transactions internally. These transactions rely on pessimistic locking and,
700// if necessary, two-phase commit. Locking read-write transactions may abort,
701// requiring the application to retry. However, the interface exposed by
702// (*Client).ReadWriteTransaction eliminates the need for applications to write
703// retry loops explicitly.
704//
705// Locking transactions may be used to atomically read-modify-write data
706// anywhere in a database. This type of transaction is externally consistent.
707//
708// Clients should attempt to minimize the amount of time a transaction is
709// active. Faster transactions commit with higher probability and cause less
710// contention. Cloud Spanner attempts to keep read locks active as long as the
711// transaction continues to do reads.  Long periods of inactivity at the client
712// may cause Cloud Spanner to release a transaction's locks and abort it.
713//
714// Reads performed within a transaction acquire locks on the data being
715// read. Writes can only be done at commit time, after all reads have been
716// completed. Conceptually, a read-write transaction consists of zero or more
717// reads or SQL queries followed by a commit.
718//
719// See (*Client).ReadWriteTransaction for an example.
720//
721// Semantics
722//
723// Cloud Spanner can commit the transaction if all read locks it acquired are
724// still valid at commit time, and it is able to acquire write locks for all
725// writes. Cloud Spanner can abort the transaction for any reason. If a commit
726// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has
727// not modified any user data in Cloud Spanner.
728//
729// Unless the transaction commits, Cloud Spanner makes no guarantees about how
730// long the transaction's locks were held for. It is an error to use Cloud
731// Spanner locks for any sort of mutual exclusion other than between Cloud
732// Spanner transactions themselves.
733//
734// Aborted transactions
735//
736// Application code does not need to retry explicitly; RunInTransaction will
737// automatically retry a transaction if an attempt results in an abort. The lock
738// priority of a transaction increases after each prior aborted transaction,
739// meaning that the next attempt has a slightly better chance of success than
740// before.
741//
742// Under some circumstances (e.g., many transactions attempting to modify the
743// same row(s)), a transaction can abort many times in a short period before
744// successfully committing. Thus, it is not a good idea to cap the number of
745// retries a transaction can attempt; instead, it is better to limit the total
746// amount of wall time spent retrying.
747//
748// Idle transactions
749//
750// A transaction is considered idle if it has no outstanding reads or SQL
751// queries and has not started a read or SQL query within the last 10
752// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't
753// hold on to locks indefinitely. In that case, the commit will fail with error
754// ABORTED.
755//
756// If this behavior is undesirable, periodically executing a simple SQL query
757// in the transaction (e.g., SELECT 1) prevents the transaction from becoming
758// idle.
759type ReadWriteTransaction struct {
760	// txReadOnly contains methods for performing transactional reads.
761	txReadOnly
762	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
763	// ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin()
764	// during the initialization of ReadWriteTransaction.
765	tx transactionID
766	// mu protects concurrent access to the internal states of
767	// ReadWriteTransaction.
768	mu sync.Mutex
769	// state is the current transaction status of the read-write transaction.
770	state txState
771	// wb is the set of buffered mutations waiting to be committed.
772	wb []*Mutation
773}
774
775// BufferWrite adds a list of mutations to the set of updates that will be
776// applied when the transaction is committed. It does not actually apply the
777// write until the transaction is committed, so the operation does not block.
778// The effects of the write won't be visible to any reads (including reads done
779// in the same transaction) until the transaction commits.
780//
781// See the example for Client.ReadWriteTransaction.
782func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error {
783	t.mu.Lock()
784	defer t.mu.Unlock()
785	if t.state == txClosed {
786		return errTxClosed()
787	}
788	if t.state != txActive {
789		return errUnexpectedTxState(t.state)
790	}
791	t.wb = append(t.wb, ms...)
792	return nil
793}
794
795// Update executes a DML statement against the database. It returns the number
796// of affected rows. Update returns an error if the statement is a query.
797// However, the query is executed, and any data read will be validated upon
798// commit.
799func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) {
800	mode := sppb.ExecuteSqlRequest_NORMAL
801	return t.update(ctx, stmt, QueryOptions{
802		Mode:    &mode,
803		Options: t.qo.Options,
804	})
805}
806
807// UpdateWithOptions executes a DML statement against the database. It returns
808// the number of affected rows. The sql query execution will be optimized
809// based on the given query options.
810func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) {
811	return t.update(ctx, stmt, t.qo.merge(opts))
812}
813
814func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) {
815	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update")
816	defer func() { trace.EndSpan(ctx, err) }()
817	req, sh, err := t.prepareExecuteSQL(ctx, stmt, opts)
818	if err != nil {
819		return 0, err
820	}
821	resultSet, err := sh.getClient().ExecuteSql(ctx, req)
822	if err != nil {
823		return 0, toSpannerError(err)
824	}
825	if resultSet.Stats == nil {
826		return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL)
827	}
828	return extractRowCount(resultSet.Stats)
829}
830
831// BatchUpdate groups one or more DML statements and sends them to Spanner in a
832// single RPC. This is an efficient way to execute multiple DML statements.
833//
834// A slice of counts is returned, where each count represents the number of
835// affected rows for the given query at the same index. If an error occurs,
836// counts will be returned up to the query that encountered the error.
837func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) {
838	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate")
839	defer func() { trace.EndSpan(ctx, err) }()
840
841	sh, ts, err := t.acquire(ctx)
842	if err != nil {
843		return nil, err
844	}
845	// Cloud Spanner will return "Session not found" on bad sessions.
846	sid := sh.getID()
847	if sid == "" {
848		// Might happen if transaction is closed in the middle of a API call.
849		return nil, errSessionClosed(sh)
850	}
851
852	var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement
853	for _, st := range stmts {
854		params, paramTypes, err := st.convertParams()
855		if err != nil {
856			return nil, err
857		}
858		sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{
859			Sql:        st.SQL,
860			Params:     params,
861			ParamTypes: paramTypes,
862		})
863	}
864
865	resp, err := sh.getClient().ExecuteBatchDml(ctx, &sppb.ExecuteBatchDmlRequest{
866		Session:     sh.getID(),
867		Transaction: ts,
868		Statements:  sppbStmts,
869		Seqno:       atomic.AddInt64(&t.sequenceNumber, 1),
870	})
871	if err != nil {
872		return nil, toSpannerError(err)
873	}
874
875	var counts []int64
876	for _, rs := range resp.ResultSets {
877		count, err := extractRowCount(rs.Stats)
878		if err != nil {
879			return nil, err
880		}
881		counts = append(counts, count)
882	}
883	if resp.Status.Code != 0 {
884		return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message)
885	}
886	return counts, nil
887}
888
889// acquire implements txReadEnv.acquire.
890func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
891	ts := &sppb.TransactionSelector{
892		Selector: &sppb.TransactionSelector_Id{
893			Id: t.tx,
894		},
895	}
896	t.mu.Lock()
897	defer t.mu.Unlock()
898	switch t.state {
899	case txClosed:
900		return nil, nil, errTxClosed()
901	case txActive:
902		return t.sh, ts, nil
903	}
904	return nil, nil, errUnexpectedTxState(t.state)
905}
906
907// release implements txReadEnv.release.
908func (t *ReadWriteTransaction) release(err error) {
909	t.mu.Lock()
910	sh := t.sh
911	t.mu.Unlock()
912	if sh != nil && isSessionNotFoundError(err) {
913		sh.destroy()
914	}
915}
916
917func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) {
918	res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
919		Session: sid,
920		Options: &sppb.TransactionOptions{
921			Mode: &sppb.TransactionOptions_ReadWrite_{
922				ReadWrite: &sppb.TransactionOptions_ReadWrite{},
923			},
924		},
925	})
926	if err != nil {
927		return nil, err
928	}
929	if res.Id == nil {
930		return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.")
931	}
932	return res.Id, nil
933}
934
935// begin starts a read-write transacton on Cloud Spanner, it is always called
936// before any of the public APIs.
937func (t *ReadWriteTransaction) begin(ctx context.Context) error {
938	if t.tx != nil {
939		t.state = txActive
940		return nil
941	}
942	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
943	if err == nil {
944		t.tx = tx
945		t.state = txActive
946		return nil
947	}
948	if isSessionNotFoundError(err) {
949		t.sh.destroy()
950	}
951	return err
952}
953
954// commit tries to commit a readwrite transaction to Cloud Spanner. It also
955// returns the commit timestamp for the transactions.
956func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
957	var ts time.Time
958	t.mu.Lock()
959	t.state = txClosed // No further operations after commit.
960	mPb, err := mutationsProto(t.wb)
961	t.mu.Unlock()
962	if err != nil {
963		return ts, err
964	}
965	// In case that sessionHandle was destroyed but transaction body fails to
966	// report it.
967	sid, client := t.sh.getID(), t.sh.getClient()
968	if sid == "" || client == nil {
969		return ts, errSessionClosed(t.sh)
970	}
971
972	res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{
973		Session: sid,
974		Transaction: &sppb.CommitRequest_TransactionId{
975			TransactionId: t.tx,
976		},
977		Mutations: mPb,
978	})
979	if e != nil {
980		return ts, toSpannerErrorWithCommitInfo(e, true)
981	}
982	if tstamp := res.GetCommitTimestamp(); tstamp != nil {
983		ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
984	}
985	if isSessionNotFoundError(err) {
986		t.sh.destroy()
987	}
988	return ts, err
989}
990
991// rollback is called when a commit is aborted or the transaction body runs
992// into error.
993func (t *ReadWriteTransaction) rollback(ctx context.Context) {
994	t.mu.Lock()
995	// Forbid further operations on rollbacked transaction.
996	t.state = txClosed
997	t.mu.Unlock()
998	// In case that sessionHandle was destroyed but transaction body fails to
999	// report it.
1000	sid, client := t.sh.getID(), t.sh.getClient()
1001	if sid == "" || client == nil {
1002		return
1003	}
1004	err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{
1005		Session:       sid,
1006		TransactionId: t.tx,
1007	})
1008	if isSessionNotFoundError(err) {
1009		t.sh.destroy()
1010	}
1011}
1012
1013// runInTransaction executes f under a read-write transaction context.
1014func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) {
1015	var (
1016		ts              time.Time
1017		err             error
1018		errDuringCommit bool
1019	)
1020	if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
1021		// Try to commit if transaction body returns no error.
1022		ts, err = t.commit(ctx)
1023		errDuringCommit = err != nil
1024	}
1025	if err != nil {
1026		if isAbortErr(err) {
1027			// Retry the transaction using the same session on ABORT error.
1028			// Cloud Spanner will create the new transaction with the previous
1029			// one's wound-wait priority.
1030			return ts, err
1031		}
1032		if isSessionNotFoundError(err) {
1033			t.sh.destroy()
1034			return ts, err
1035		}
1036		// Rollback the transaction unless the error occurred during the
1037		// commit. Executing a rollback after a commit has failed will
1038		// otherwise cause an error. Note that transient errors, such as
1039		// UNAVAILABLE, are already handled in the gRPC layer and do not show
1040		// up here. Context errors (deadline exceeded / canceled) during
1041		// commits are also not rolled back.
1042		if !errDuringCommit {
1043			t.rollback(ctx)
1044		}
1045		return ts, err
1046	}
1047	// err == nil, return commit timestamp.
1048	return ts, nil
1049}
1050
1051// writeOnlyTransaction provides the most efficient way of doing write-only
1052// transactions. It essentially does blind writes to Cloud Spanner.
1053type writeOnlyTransaction struct {
1054	// sp is the session pool which writeOnlyTransaction uses to get Cloud
1055	// Spanner sessions for blind writes.
1056	sp *sessionPool
1057}
1058
1059// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once,
1060// unless one of the following happens:
1061//
1062//     1) Context times out.
1063//     2) An unretryable error (e.g. database not found) occurs.
1064//     3) There is a malformed Mutation object.
1065func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) {
1066	var (
1067		ts time.Time
1068		sh *sessionHandle
1069	)
1070	mPb, err := mutationsProto(ms)
1071	if err != nil {
1072		// Malformed mutation found, just return the error.
1073		return ts, err
1074	}
1075
1076	// Retry-loop for aborted transactions.
1077	// TODO: Replace with generic retryer.
1078	for {
1079		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
1080			// No usable session for doing the commit, take one from pool.
1081			sh, err = t.sp.take(ctx)
1082			if err != nil {
1083				// sessionPool.Take already retries for session
1084				// creations/retrivals.
1085				return ts, err
1086			}
1087			defer sh.recycle()
1088		}
1089		res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
1090			Session: sh.getID(),
1091			Transaction: &sppb.CommitRequest_SingleUseTransaction{
1092				SingleUseTransaction: &sppb.TransactionOptions{
1093					Mode: &sppb.TransactionOptions_ReadWrite_{
1094						ReadWrite: &sppb.TransactionOptions_ReadWrite{},
1095					},
1096				},
1097			},
1098			Mutations: mPb,
1099		})
1100		if err != nil && !isAbortErr(err) {
1101			if isSessionNotFoundError(err) {
1102				// Discard the bad session.
1103				sh.destroy()
1104			}
1105			return ts, toSpannerErrorWithCommitInfo(err, true)
1106		} else if err == nil {
1107			if tstamp := res.GetCommitTimestamp(); tstamp != nil {
1108				ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
1109			}
1110			break
1111		}
1112	}
1113	return ts, toSpannerError(err)
1114}
1115
1116// isAbortedErr returns true if the error indicates that an gRPC call is
1117// aborted on the server side.
1118func isAbortErr(err error) bool {
1119	if err == nil {
1120		return false
1121	}
1122	if ErrCode(err) == codes.Aborted {
1123		return true
1124	}
1125	return false
1126}
1127