1/*
2Copyright 2017 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"sync"
22	"sync/atomic"
23	"time"
24
25	"cloud.google.com/go/internal/trace"
26	vkit "cloud.google.com/go/spanner/apiv1"
27	"github.com/golang/protobuf/proto"
28	"google.golang.org/api/iterator"
29	sppb "google.golang.org/genproto/googleapis/spanner/v1"
30	"google.golang.org/grpc/codes"
31)
32
33// transactionID stores a transaction ID which uniquely identifies a transaction
34// in Cloud Spanner.
35type transactionID []byte
36
37// txReadEnv manages a read-transaction environment consisting of a session
38// handle and a transaction selector.
39type txReadEnv interface {
40	// acquire returns a read-transaction environment that can be used to
41	// perform a transactional read.
42	acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error)
43	// sets the transaction's read timestamp
44	setTimestamp(time.Time)
45	// release should be called at the end of every transactional read to deal
46	// with session recycling.
47	release(error)
48}
49
50// txReadOnly contains methods for doing transactional reads.
51type txReadOnly struct {
52	// read-transaction environment for performing transactional read
53	// operations.
54	txReadEnv
55
56	// Atomic. Only needed for DML statements, but used forall.
57	sequenceNumber int64
58
59	// replaceSessionFunc is a function that can be called to replace the
60	// session that is used by the transaction. This function should only be
61	// defined for single-use transactions that can safely be retried on a
62	// different session. All other transactions will set this function to nil.
63	replaceSessionFunc func(ctx context.Context) error
64
65	// sp is the session pool for allocating a session to execute the read-only
66	// transaction. It is set only once during initialization of the
67	// txReadOnly.
68	sp *sessionPool
69	// sh is the sessionHandle allocated from sp.
70	sh *sessionHandle
71
72	// qo provides options for executing a sql query.
73	qo QueryOptions
74}
75
76// errSessionClosed returns error for using a recycled/destroyed session
77func errSessionClosed(sh *sessionHandle) error {
78	return spannerErrorf(codes.FailedPrecondition,
79		"session is already recycled / destroyed: session_id = %q, rpc_client = %v", sh.getID(), sh.getClient())
80}
81
82// Read returns a RowIterator for reading multiple rows from the database.
83func (t *txReadOnly) Read(ctx context.Context, table string, keys KeySet, columns []string) *RowIterator {
84	return t.ReadWithOptions(ctx, table, keys, columns, nil)
85}
86
87// ReadUsingIndex calls ReadWithOptions with ReadOptions{Index: index}.
88func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, keys KeySet, columns []string) (ri *RowIterator) {
89	return t.ReadWithOptions(ctx, table, keys, columns, &ReadOptions{Index: index})
90}
91
92// ReadOptions provides options for reading rows from a database.
93type ReadOptions struct {
94	// The index to use for reading. If non-empty, you can only read columns
95	// that are part of the index key, part of the primary key, or stored in the
96	// index due to a STORING clause in the index definition.
97	Index string
98
99	// The maximum number of rows to read. A limit value less than 1 means no
100	// limit.
101	Limit int
102}
103
104// ReadWithOptions returns a RowIterator for reading multiple rows from the
105// database. Pass a ReadOptions to modify the read operation.
106func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys KeySet, columns []string, opts *ReadOptions) (ri *RowIterator) {
107	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Read")
108	defer func() { trace.EndSpan(ctx, ri.err) }()
109	var (
110		sh  *sessionHandle
111		ts  *sppb.TransactionSelector
112		err error
113	)
114	kset, err := keys.keySetProto()
115	if err != nil {
116		return &RowIterator{err: err}
117	}
118	if sh, ts, err = t.acquire(ctx); err != nil {
119		return &RowIterator{err: err}
120	}
121	// Cloud Spanner will return "Session not found" on bad sessions.
122	sid, client := sh.getID(), sh.getClient()
123	if sid == "" || client == nil {
124		// Might happen if transaction is closed in the middle of a API call.
125		return &RowIterator{err: errSessionClosed(sh)}
126	}
127	index := ""
128	limit := 0
129	if opts != nil {
130		index = opts.Index
131		if opts.Limit > 0 {
132			limit = opts.Limit
133		}
134	}
135	return stream(
136		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
137		sh.session.logger,
138		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
139			return client.StreamingRead(ctx,
140				&sppb.ReadRequest{
141					Session:     sid,
142					Transaction: ts,
143					Table:       table,
144					Index:       index,
145					Columns:     columns,
146					KeySet:      kset,
147					ResumeToken: resumeToken,
148					Limit:       int64(limit),
149				})
150		},
151		t.setTimestamp,
152		t.release,
153	)
154}
155
156// errRowNotFound returns error for not being able to read the row identified by
157// key.
158func errRowNotFound(table string, key Key) error {
159	return spannerErrorf(codes.NotFound, "row not found(Table: %v, PrimaryKey: %v)", table, key)
160}
161
162// errRowNotFoundByIndex returns error for not being able to read the row by index.
163func errRowNotFoundByIndex(table string, key Key, index string) error {
164	return spannerErrorf(codes.NotFound, "row not found(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
165}
166
167// errMultipleRowsFound returns error for receiving more than one row when reading a single row using an index.
168func errMultipleRowsFound(table string, key Key, index string) error {
169	return spannerErrorf(codes.FailedPrecondition, "more than one row found by index(Table: %v, IndexKey: %v, Index: %v)", table, key, index)
170}
171
172// ReadRow reads a single row from the database.
173//
174// If no row is present with the given key, then ReadRow returns an error where
175// spanner.ErrCode(err) is codes.NotFound.
176func (t *txReadOnly) ReadRow(ctx context.Context, table string, key Key, columns []string) (*Row, error) {
177	iter := t.Read(ctx, table, key, columns)
178	defer iter.Stop()
179	row, err := iter.Next()
180	switch err {
181	case iterator.Done:
182		return nil, errRowNotFound(table, key)
183	case nil:
184		return row, nil
185	default:
186		return nil, err
187	}
188}
189
190// ReadRowUsingIndex reads a single row from the database using an index.
191//
192// If no row is present with the given index, then ReadRowUsingIndex returns an
193// error where spanner.ErrCode(err) is codes.NotFound.
194//
195// If more than one row received with the given index, then ReadRowUsingIndex
196// returns an error where spanner.ErrCode(err) is codes.FailedPrecondition.
197func (t *txReadOnly) ReadRowUsingIndex(ctx context.Context, table string, index string, key Key, columns []string) (*Row, error) {
198	iter := t.ReadUsingIndex(ctx, table, index, key, columns)
199	defer iter.Stop()
200	row, err := iter.Next()
201	switch err {
202	case iterator.Done:
203		return nil, errRowNotFoundByIndex(table, key, index)
204	case nil:
205		// If more than one row found, return an error.
206		_, err := iter.Next()
207		switch err {
208		case iterator.Done:
209			return row, nil
210		case nil:
211			return nil, errMultipleRowsFound(table, key, index)
212		default:
213			return nil, err
214		}
215	default:
216		return nil, err
217	}
218}
219
220// QueryOptions provides options for executing a sql query from a database.
221type QueryOptions struct {
222	Mode    *sppb.ExecuteSqlRequest_QueryMode
223	Options *sppb.ExecuteSqlRequest_QueryOptions
224}
225
226// merge combines two QueryOptions that the input parameter will have higher
227// order of precedence.
228func (qo QueryOptions) merge(opts QueryOptions) QueryOptions {
229	merged := QueryOptions{
230		Mode:    qo.Mode,
231		Options: &sppb.ExecuteSqlRequest_QueryOptions{},
232	}
233	if opts.Mode != nil {
234		merged.Mode = opts.Mode
235	}
236	proto.Merge(merged.Options, qo.Options)
237	proto.Merge(merged.Options, opts.Options)
238	return merged
239}
240
241// Query executes a query against the database. It returns a RowIterator for
242// retrieving the resulting rows.
243//
244// Query returns only row data, without a query plan or execution statistics.
245// Use QueryWithStats to get rows along with the plan and statistics. Use
246// AnalyzeQuery to get just the plan.
247func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterator {
248	mode := sppb.ExecuteSqlRequest_NORMAL
249	return t.query(ctx, statement, QueryOptions{
250		Mode:    &mode,
251		Options: t.qo.Options,
252	})
253}
254
255// QueryWithOptions executes a SQL statment against the database. It returns
256// a RowIterator for retrieving the resulting rows. The sql query execution
257// will be optimized based on the given query options.
258func (t *txReadOnly) QueryWithOptions(ctx context.Context, statement Statement, opts QueryOptions) *RowIterator {
259	return t.query(ctx, statement, t.qo.merge(opts))
260}
261
262// QueryWithStats executes a SQL statement against the database. It returns
263// a RowIterator for retrieving the resulting rows. The RowIterator will also
264// be populated with a query plan and execution statistics.
265func (t *txReadOnly) QueryWithStats(ctx context.Context, statement Statement) *RowIterator {
266	mode := sppb.ExecuteSqlRequest_PROFILE
267	return t.query(ctx, statement, QueryOptions{
268		Mode:    &mode,
269		Options: t.qo.Options,
270	})
271}
272
273// AnalyzeQuery returns the query plan for statement.
274func (t *txReadOnly) AnalyzeQuery(ctx context.Context, statement Statement) (*sppb.QueryPlan, error) {
275	mode := sppb.ExecuteSqlRequest_PLAN
276	iter := t.query(ctx, statement, QueryOptions{
277		Mode:    &mode,
278		Options: t.qo.Options,
279	})
280	defer iter.Stop()
281	for {
282		_, err := iter.Next()
283		if err == iterator.Done {
284			break
285		}
286		if err != nil {
287			return nil, err
288		}
289	}
290	if iter.QueryPlan == nil {
291		return nil, spannerErrorf(codes.Internal, "query plan unavailable")
292	}
293	return iter.QueryPlan, nil
294}
295
296func (t *txReadOnly) query(ctx context.Context, statement Statement, options QueryOptions) (ri *RowIterator) {
297	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Query")
298	defer func() { trace.EndSpan(ctx, ri.err) }()
299	req, sh, err := t.prepareExecuteSQL(ctx, statement, options)
300	if err != nil {
301		return &RowIterator{err: err}
302	}
303	client := sh.getClient()
304	return streamWithReplaceSessionFunc(
305		contextWithOutgoingMetadata(ctx, sh.getMetadata()),
306		sh.session.logger,
307		func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) {
308			req.ResumeToken = resumeToken
309			req.Session = t.sh.getID()
310			return client.ExecuteStreamingSql(ctx, req)
311		},
312		t.replaceSessionFunc,
313		t.setTimestamp,
314		t.release)
315}
316
317func (t *txReadOnly) prepareExecuteSQL(ctx context.Context, stmt Statement, options QueryOptions) (*sppb.ExecuteSqlRequest, *sessionHandle, error) {
318	sh, ts, err := t.acquire(ctx)
319	if err != nil {
320		return nil, nil, err
321	}
322	// Cloud Spanner will return "Session not found" on bad sessions.
323	sid := sh.getID()
324	if sid == "" {
325		// Might happen if transaction is closed in the middle of a API call.
326		return nil, nil, errSessionClosed(sh)
327	}
328	params, paramTypes, err := stmt.convertParams()
329	if err != nil {
330		return nil, nil, err
331	}
332	mode := sppb.ExecuteSqlRequest_NORMAL
333	if options.Mode != nil {
334		mode = *options.Mode
335	}
336	req := &sppb.ExecuteSqlRequest{
337		Session:      sid,
338		Transaction:  ts,
339		Sql:          stmt.SQL,
340		QueryMode:    mode,
341		Seqno:        atomic.AddInt64(&t.sequenceNumber, 1),
342		Params:       params,
343		ParamTypes:   paramTypes,
344		QueryOptions: options.Options,
345	}
346	return req, sh, nil
347}
348
349// txState is the status of a transaction.
350type txState int
351
352const (
353	// transaction is new, waiting to be initialized..
354	txNew txState = iota
355	// transaction is being initialized.
356	txInit
357	// transaction is active and can perform read/write.
358	txActive
359	// transaction is closed, cannot be used anymore.
360	txClosed
361)
362
363// errRtsUnavailable returns error for read transaction's read timestamp being
364// unavailable.
365func errRtsUnavailable() error {
366	return spannerErrorf(codes.Internal, "read timestamp is unavailable")
367}
368
369// errTxClosed returns error for using a closed transaction.
370func errTxClosed() error {
371	return spannerErrorf(codes.InvalidArgument, "cannot use a closed transaction")
372}
373
374// errUnexpectedTxState returns error for transaction enters an unexpected state.
375func errUnexpectedTxState(ts txState) error {
376	return spannerErrorf(codes.FailedPrecondition, "unexpected transaction state: %v", ts)
377}
378
379// ReadOnlyTransaction provides a snapshot transaction with guaranteed
380// consistency across reads, but does not allow writes.  Read-only transactions
381// can be configured to read at timestamps in the past.
382//
383// Read-only transactions do not take locks. Instead, they work by choosing a
384// Cloud Spanner timestamp, then executing all reads at that timestamp. Since
385// they do not acquire locks, they do not block concurrent read-write
386// transactions.
387//
388// Unlike locking read-write transactions, read-only transactions never abort.
389// They can fail if the chosen read timestamp is garbage collected; however, the
390// default garbage collection policy is generous enough that most applications
391// do not need to worry about this in practice. See the documentation of
392// TimestampBound for more details.
393//
394// A ReadOnlyTransaction consumes resources on the server until Close is called.
395type ReadOnlyTransaction struct {
396	// mu protects concurrent access to the internal states of ReadOnlyTransaction.
397	mu sync.Mutex
398	// txReadOnly contains methods for performing transactional reads.
399	txReadOnly
400	// singleUse indicates that the transaction can be used for only one read.
401	singleUse bool
402	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
403	// ReadOnlyTransaction.
404	tx transactionID
405	// txReadyOrClosed is for broadcasting that transaction ID has been returned
406	// by Cloud Spanner or that transaction is closed.
407	txReadyOrClosed chan struct{}
408	// state is the current transaction status of the ReadOnly transaction.
409	state txState
410	// rts is the read timestamp returned by transactional reads.
411	rts time.Time
412	// tb is the read staleness bound specification for transactional reads.
413	tb TimestampBound
414}
415
416// errTxInitTimeout returns error for timeout in waiting for initialization of
417// the transaction.
418func errTxInitTimeout() error {
419	return spannerErrorf(codes.Canceled, "timeout/context canceled in waiting for transaction's initialization")
420}
421
422// getTimestampBound returns the read staleness bound specified for the
423// ReadOnlyTransaction.
424func (t *ReadOnlyTransaction) getTimestampBound() TimestampBound {
425	t.mu.Lock()
426	defer t.mu.Unlock()
427	return t.tb
428}
429
430// begin starts a snapshot read-only Transaction on Cloud Spanner.
431func (t *ReadOnlyTransaction) begin(ctx context.Context) error {
432	var (
433		locked bool
434		tx     transactionID
435		rts    time.Time
436		sh     *sessionHandle
437		err    error
438		res    *sppb.Transaction
439	)
440	defer func() {
441		if !locked {
442			t.mu.Lock()
443			// Not necessary, just to make it clear that t.mu is being held when
444			// locked == true.
445			locked = true
446		}
447		if t.state != txClosed {
448			// Signal other initialization routines.
449			close(t.txReadyOrClosed)
450			t.txReadyOrClosed = make(chan struct{})
451		}
452		t.mu.Unlock()
453		if err != nil && sh != nil {
454			// Got a valid session handle, but failed to initialize transaction=
455			// on Cloud Spanner.
456			if isSessionNotFoundError(err) {
457				sh.destroy()
458			}
459			// If sh.destroy was already executed, this becomes a noop.
460			sh.recycle()
461		}
462	}()
463	// Retry the BeginTransaction call if a 'Session not found' is returned.
464	for {
465		sh, err = t.sp.take(ctx)
466		if err != nil {
467			return err
468		}
469		res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
470			Session: sh.getID(),
471			Options: &sppb.TransactionOptions{
472				Mode: &sppb.TransactionOptions_ReadOnly_{
473					ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true),
474				},
475			},
476		})
477		if isSessionNotFoundError(err) {
478			sh.destroy()
479			continue
480		} else if err == nil {
481			tx = res.Id
482			if res.ReadTimestamp != nil {
483				rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
484			}
485		} else {
486			err = toSpannerError(err)
487		}
488		break
489	}
490	t.mu.Lock()
491
492	// defer function will be executed with t.mu being held.
493	locked = true
494
495	// During the execution of t.begin(), t.Close() was invoked.
496	if t.state == txClosed {
497		return errSessionClosed(sh)
498	}
499
500	// If begin() fails, this allows other queries to take over the
501	// initialization.
502	t.tx = nil
503	if err == nil {
504		t.tx = tx
505		t.rts = rts
506		t.sh = sh
507		// State transite to txActive.
508		t.state = txActive
509	}
510	return err
511}
512
513// acquire implements txReadEnv.acquire.
514func (t *ReadOnlyTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
515	if err := checkNestedTxn(ctx); err != nil {
516		return nil, nil, err
517	}
518	if t.singleUse {
519		return t.acquireSingleUse(ctx)
520	}
521	return t.acquireMultiUse(ctx)
522}
523
524func (t *ReadOnlyTransaction) acquireSingleUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
525	t.mu.Lock()
526	defer t.mu.Unlock()
527	switch t.state {
528	case txClosed:
529		// A closed single-use transaction can never be reused.
530		return nil, nil, errTxClosed()
531	case txNew:
532		t.state = txClosed
533		ts := &sppb.TransactionSelector{
534			Selector: &sppb.TransactionSelector_SingleUse{
535				SingleUse: &sppb.TransactionOptions{
536					Mode: &sppb.TransactionOptions_ReadOnly_{
537						ReadOnly: buildTransactionOptionsReadOnly(t.tb, true),
538					},
539				},
540			},
541		}
542		sh, err := t.sp.take(ctx)
543		if err != nil {
544			return nil, nil, err
545		}
546
547		// Install session handle into t, which can be used for readonly
548		// operations later.
549		t.sh = sh
550		return sh, ts, nil
551	}
552	us := t.state
553
554	// SingleUse transaction should only be in either txNew state or txClosed
555	// state.
556	return nil, nil, errUnexpectedTxState(us)
557}
558
559func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
560	for {
561		t.mu.Lock()
562		switch t.state {
563		case txClosed:
564			t.mu.Unlock()
565			return nil, nil, errTxClosed()
566		case txNew:
567			// State transit to txInit so that no further TimestampBound change
568			// is accepted.
569			t.state = txInit
570			t.mu.Unlock()
571			continue
572		case txInit:
573			if t.tx != nil {
574				// Wait for a transaction ID to become ready.
575				txReadyOrClosed := t.txReadyOrClosed
576				t.mu.Unlock()
577				select {
578				case <-txReadyOrClosed:
579					// Need to check transaction state again.
580					continue
581				case <-ctx.Done():
582					// The waiting for initialization is timeout, return error
583					// directly.
584					return nil, nil, errTxInitTimeout()
585				}
586			}
587			// Take the ownership of initializing the transaction.
588			t.tx = transactionID{}
589			t.mu.Unlock()
590			// Begin a read-only transaction.
591			//
592			// TODO: consider adding a transaction option which allow queries to
593			//  initiate transactions by themselves. Note that this option might
594			//  not be always good because the ID of the new transaction won't
595			//  be ready till the query returns some data or completes.
596			if err := t.begin(ctx); err != nil {
597				return nil, nil, err
598			}
599
600			// If t.begin() succeeded, t.state should have been changed to
601			// txActive, so we can just continue here.
602			continue
603		case txActive:
604			sh := t.sh
605			ts := &sppb.TransactionSelector{
606				Selector: &sppb.TransactionSelector_Id{
607					Id: t.tx,
608				},
609			}
610			t.mu.Unlock()
611			return sh, ts, nil
612		}
613		state := t.state
614		t.mu.Unlock()
615		return nil, nil, errUnexpectedTxState(state)
616	}
617}
618
619func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) {
620	t.mu.Lock()
621	defer t.mu.Unlock()
622	if t.rts.IsZero() {
623		t.rts = ts
624	}
625}
626
627// release implements txReadEnv.release.
628func (t *ReadOnlyTransaction) release(err error) {
629	t.mu.Lock()
630	sh := t.sh
631	t.mu.Unlock()
632	if sh != nil { // sh could be nil if t.acquire() fails.
633		if isSessionNotFoundError(err) {
634			sh.destroy()
635		}
636		if t.singleUse {
637			// If session handle is already destroyed, this becomes a noop.
638			sh.recycle()
639		}
640	}
641}
642
643// Close closes a ReadOnlyTransaction, the transaction cannot perform any reads
644// after being closed.
645func (t *ReadOnlyTransaction) Close() {
646	if t.singleUse {
647		return
648	}
649	t.mu.Lock()
650	if t.state != txClosed {
651		t.state = txClosed
652		close(t.txReadyOrClosed)
653	}
654	sh := t.sh
655	t.mu.Unlock()
656	if sh == nil {
657		return
658	}
659	// If session handle is already destroyed, this becomes a noop. If there are
660	// still active queries and if the recycled session is reused before they
661	// complete, Cloud Spanner will cancel them on behalf of the new transaction
662	// on the session.
663	if sh != nil {
664		sh.recycle()
665	}
666}
667
668// Timestamp returns the timestamp chosen to perform reads and queries in this
669// transaction. The value can only be read after some read or query has either
670// returned some data or completed without returning any data.
671func (t *ReadOnlyTransaction) Timestamp() (time.Time, error) {
672	t.mu.Lock()
673	defer t.mu.Unlock()
674	if t.rts.IsZero() {
675		return t.rts, errRtsUnavailable()
676	}
677	return t.rts, nil
678}
679
680// WithTimestampBound specifies the TimestampBound to use for read or query.
681// This can only be used before the first read or query is invoked. Note:
682// bounded staleness is not available with general ReadOnlyTransactions; use a
683// single-use ReadOnlyTransaction instead.
684//
685// The returned value is the ReadOnlyTransaction so calls can be chained.
686func (t *ReadOnlyTransaction) WithTimestampBound(tb TimestampBound) *ReadOnlyTransaction {
687	t.mu.Lock()
688	defer t.mu.Unlock()
689	if t.state == txNew {
690		// Only allow to set TimestampBound before the first query.
691		t.tb = tb
692	}
693	return t
694}
695
696// ReadWriteTransaction provides a locking read-write transaction.
697//
698// This type of transaction is the only way to write data into Cloud Spanner;
699// (*Client).Apply, (*Client).ApplyAtLeastOnce, (*Client).PartitionedUpdate use
700// transactions internally. These transactions rely on pessimistic locking and,
701// if necessary, two-phase commit. Locking read-write transactions may abort,
702// requiring the application to retry. However, the interface exposed by
703// (*Client).ReadWriteTransaction eliminates the need for applications to write
704// retry loops explicitly.
705//
706// Locking transactions may be used to atomically read-modify-write data
707// anywhere in a database. This type of transaction is externally consistent.
708//
709// Clients should attempt to minimize the amount of time a transaction is
710// active. Faster transactions commit with higher probability and cause less
711// contention. Cloud Spanner attempts to keep read locks active as long as the
712// transaction continues to do reads.  Long periods of inactivity at the client
713// may cause Cloud Spanner to release a transaction's locks and abort it.
714//
715// Reads performed within a transaction acquire locks on the data being
716// read. Writes can only be done at commit time, after all reads have been
717// completed. Conceptually, a read-write transaction consists of zero or more
718// reads or SQL queries followed by a commit.
719//
720// See (*Client).ReadWriteTransaction for an example.
721//
722// Semantics
723//
724// Cloud Spanner can commit the transaction if all read locks it acquired are
725// still valid at commit time, and it is able to acquire write locks for all
726// writes. Cloud Spanner can abort the transaction for any reason. If a commit
727// attempt returns ABORTED, Cloud Spanner guarantees that the transaction has
728// not modified any user data in Cloud Spanner.
729//
730// Unless the transaction commits, Cloud Spanner makes no guarantees about how
731// long the transaction's locks were held for. It is an error to use Cloud
732// Spanner locks for any sort of mutual exclusion other than between Cloud
733// Spanner transactions themselves.
734//
735// Aborted transactions
736//
737// Application code does not need to retry explicitly; RunInTransaction will
738// automatically retry a transaction if an attempt results in an abort. The lock
739// priority of a transaction increases after each prior aborted transaction,
740// meaning that the next attempt has a slightly better chance of success than
741// before.
742//
743// Under some circumstances (e.g., many transactions attempting to modify the
744// same row(s)), a transaction can abort many times in a short period before
745// successfully committing. Thus, it is not a good idea to cap the number of
746// retries a transaction can attempt; instead, it is better to limit the total
747// amount of wall time spent retrying.
748//
749// Idle transactions
750//
751// A transaction is considered idle if it has no outstanding reads or SQL
752// queries and has not started a read or SQL query within the last 10
753// seconds. Idle transactions can be aborted by Cloud Spanner so that they don't
754// hold on to locks indefinitely. In that case, the commit will fail with error
755// ABORTED.
756//
757// If this behavior is undesirable, periodically executing a simple SQL query
758// in the transaction (e.g., SELECT 1) prevents the transaction from becoming
759// idle.
760type ReadWriteTransaction struct {
761	// txReadOnly contains methods for performing transactional reads.
762	txReadOnly
763	// tx is the transaction ID in Cloud Spanner that uniquely identifies the
764	// ReadWriteTransaction. It is set only once in ReadWriteTransaction.begin()
765	// during the initialization of ReadWriteTransaction.
766	tx transactionID
767	// mu protects concurrent access to the internal states of
768	// ReadWriteTransaction.
769	mu sync.Mutex
770	// state is the current transaction status of the read-write transaction.
771	state txState
772	// wb is the set of buffered mutations waiting to be committed.
773	wb []*Mutation
774}
775
776// BufferWrite adds a list of mutations to the set of updates that will be
777// applied when the transaction is committed. It does not actually apply the
778// write until the transaction is committed, so the operation does not block.
779// The effects of the write won't be visible to any reads (including reads done
780// in the same transaction) until the transaction commits.
781//
782// See the example for Client.ReadWriteTransaction.
783func (t *ReadWriteTransaction) BufferWrite(ms []*Mutation) error {
784	t.mu.Lock()
785	defer t.mu.Unlock()
786	if t.state == txClosed {
787		return errTxClosed()
788	}
789	if t.state != txActive {
790		return errUnexpectedTxState(t.state)
791	}
792	t.wb = append(t.wb, ms...)
793	return nil
794}
795
796// Update executes a DML statement against the database. It returns the number
797// of affected rows. Update returns an error if the statement is a query.
798// However, the query is executed, and any data read will be validated upon
799// commit.
800func (t *ReadWriteTransaction) Update(ctx context.Context, stmt Statement) (rowCount int64, err error) {
801	mode := sppb.ExecuteSqlRequest_NORMAL
802	return t.update(ctx, stmt, QueryOptions{
803		Mode:    &mode,
804		Options: t.qo.Options,
805	})
806}
807
808// UpdateWithOptions executes a DML statement against the database. It returns
809// the number of affected rows. The sql query execution will be optimized
810// based on the given query options.
811func (t *ReadWriteTransaction) UpdateWithOptions(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) {
812	return t.update(ctx, stmt, t.qo.merge(opts))
813}
814
815func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts QueryOptions) (rowCount int64, err error) {
816	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.Update")
817	defer func() { trace.EndSpan(ctx, err) }()
818	req, sh, err := t.prepareExecuteSQL(ctx, stmt, opts)
819	if err != nil {
820		return 0, err
821	}
822	resultSet, err := sh.getClient().ExecuteSql(ctx, req)
823	if err != nil {
824		return 0, toSpannerError(err)
825	}
826	if resultSet.Stats == nil {
827		return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", stmt.SQL)
828	}
829	return extractRowCount(resultSet.Stats)
830}
831
832// BatchUpdate groups one or more DML statements and sends them to Spanner in a
833// single RPC. This is an efficient way to execute multiple DML statements.
834//
835// A slice of counts is returned, where each count represents the number of
836// affected rows for the given query at the same index. If an error occurs,
837// counts will be returned up to the query that encountered the error.
838func (t *ReadWriteTransaction) BatchUpdate(ctx context.Context, stmts []Statement) (_ []int64, err error) {
839	ctx = trace.StartSpan(ctx, "cloud.google.com/go/spanner.BatchUpdate")
840	defer func() { trace.EndSpan(ctx, err) }()
841
842	sh, ts, err := t.acquire(ctx)
843	if err != nil {
844		return nil, err
845	}
846	// Cloud Spanner will return "Session not found" on bad sessions.
847	sid := sh.getID()
848	if sid == "" {
849		// Might happen if transaction is closed in the middle of a API call.
850		return nil, errSessionClosed(sh)
851	}
852
853	var sppbStmts []*sppb.ExecuteBatchDmlRequest_Statement
854	for _, st := range stmts {
855		params, paramTypes, err := st.convertParams()
856		if err != nil {
857			return nil, err
858		}
859		sppbStmts = append(sppbStmts, &sppb.ExecuteBatchDmlRequest_Statement{
860			Sql:        st.SQL,
861			Params:     params,
862			ParamTypes: paramTypes,
863		})
864	}
865
866	resp, err := sh.getClient().ExecuteBatchDml(ctx, &sppb.ExecuteBatchDmlRequest{
867		Session:     sh.getID(),
868		Transaction: ts,
869		Statements:  sppbStmts,
870		Seqno:       atomic.AddInt64(&t.sequenceNumber, 1),
871	})
872	if err != nil {
873		return nil, toSpannerError(err)
874	}
875
876	var counts []int64
877	for _, rs := range resp.ResultSets {
878		count, err := extractRowCount(rs.Stats)
879		if err != nil {
880			return nil, err
881		}
882		counts = append(counts, count)
883	}
884	if resp.Status.Code != 0 {
885		return counts, spannerErrorf(codes.Code(uint32(resp.Status.Code)), resp.Status.Message)
886	}
887	return counts, nil
888}
889
890// acquire implements txReadEnv.acquire.
891func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error) {
892	ts := &sppb.TransactionSelector{
893		Selector: &sppb.TransactionSelector_Id{
894			Id: t.tx,
895		},
896	}
897	t.mu.Lock()
898	defer t.mu.Unlock()
899	switch t.state {
900	case txClosed:
901		return nil, nil, errTxClosed()
902	case txActive:
903		return t.sh, ts, nil
904	}
905	return nil, nil, errUnexpectedTxState(t.state)
906}
907
908// release implements txReadEnv.release.
909func (t *ReadWriteTransaction) release(err error) {
910	t.mu.Lock()
911	sh := t.sh
912	t.mu.Unlock()
913	if sh != nil && isSessionNotFoundError(err) {
914		sh.destroy()
915	}
916}
917
918func beginTransaction(ctx context.Context, sid string, client *vkit.Client) (transactionID, error) {
919	res, err := client.BeginTransaction(ctx, &sppb.BeginTransactionRequest{
920		Session: sid,
921		Options: &sppb.TransactionOptions{
922			Mode: &sppb.TransactionOptions_ReadWrite_{
923				ReadWrite: &sppb.TransactionOptions_ReadWrite{},
924			},
925		},
926	})
927	if err != nil {
928		return nil, err
929	}
930	if res.Id == nil {
931		return nil, spannerErrorf(codes.Unknown, "BeginTransaction returned a transaction with a nil ID.")
932	}
933	return res.Id, nil
934}
935
936// begin starts a read-write transacton on Cloud Spanner, it is always called
937// before any of the public APIs.
938func (t *ReadWriteTransaction) begin(ctx context.Context) error {
939	if t.tx != nil {
940		t.state = txActive
941		return nil
942	}
943	tx, err := beginTransaction(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), t.sh.getID(), t.sh.getClient())
944	if err == nil {
945		t.tx = tx
946		t.state = txActive
947		return nil
948	}
949	if isSessionNotFoundError(err) {
950		t.sh.destroy()
951	}
952	return err
953}
954
955// commit tries to commit a readwrite transaction to Cloud Spanner. It also
956// returns the commit timestamp for the transactions.
957func (t *ReadWriteTransaction) commit(ctx context.Context) (time.Time, error) {
958	var ts time.Time
959	t.mu.Lock()
960	t.state = txClosed // No further operations after commit.
961	mPb, err := mutationsProto(t.wb)
962	t.mu.Unlock()
963	if err != nil {
964		return ts, err
965	}
966	// In case that sessionHandle was destroyed but transaction body fails to
967	// report it.
968	sid, client := t.sh.getID(), t.sh.getClient()
969	if sid == "" || client == nil {
970		return ts, errSessionClosed(t.sh)
971	}
972
973	res, e := client.Commit(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.CommitRequest{
974		Session: sid,
975		Transaction: &sppb.CommitRequest_TransactionId{
976			TransactionId: t.tx,
977		},
978		Mutations: mPb,
979	})
980	if e != nil {
981		return ts, toSpannerErrorWithCommitInfo(e, true)
982	}
983	if tstamp := res.GetCommitTimestamp(); tstamp != nil {
984		ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
985	}
986	if isSessionNotFoundError(err) {
987		t.sh.destroy()
988	}
989	return ts, err
990}
991
992// rollback is called when a commit is aborted or the transaction body runs
993// into error.
994func (t *ReadWriteTransaction) rollback(ctx context.Context) {
995	t.mu.Lock()
996	// Forbid further operations on rollbacked transaction.
997	t.state = txClosed
998	t.mu.Unlock()
999	// In case that sessionHandle was destroyed but transaction body fails to
1000	// report it.
1001	sid, client := t.sh.getID(), t.sh.getClient()
1002	if sid == "" || client == nil {
1003		return
1004	}
1005	err := client.Rollback(contextWithOutgoingMetadata(ctx, t.sh.getMetadata()), &sppb.RollbackRequest{
1006		Session:       sid,
1007		TransactionId: t.tx,
1008	})
1009	if isSessionNotFoundError(err) {
1010		t.sh.destroy()
1011	}
1012}
1013
1014// runInTransaction executes f under a read-write transaction context.
1015func (t *ReadWriteTransaction) runInTransaction(ctx context.Context, f func(context.Context, *ReadWriteTransaction) error) (time.Time, error) {
1016	var (
1017		ts              time.Time
1018		err             error
1019		errDuringCommit bool
1020	)
1021	if err = f(context.WithValue(ctx, transactionInProgressKey{}, 1), t); err == nil {
1022		// Try to commit if transaction body returns no error.
1023		ts, err = t.commit(ctx)
1024		errDuringCommit = err != nil
1025	}
1026	if err != nil {
1027		if isAbortErr(err) {
1028			// Retry the transaction using the same session on ABORT error.
1029			// Cloud Spanner will create the new transaction with the previous
1030			// one's wound-wait priority.
1031			return ts, err
1032		}
1033		if isSessionNotFoundError(err) {
1034			t.sh.destroy()
1035			return ts, err
1036		}
1037		// Rollback the transaction unless the error occurred during the
1038		// commit. Executing a rollback after a commit has failed will
1039		// otherwise cause an error. Note that transient errors, such as
1040		// UNAVAILABLE, are already handled in the gRPC layer and do not show
1041		// up here. Context errors (deadline exceeded / canceled) during
1042		// commits are also not rolled back.
1043		if !errDuringCommit {
1044			t.rollback(ctx)
1045		}
1046		return ts, err
1047	}
1048	// err == nil, return commit timestamp.
1049	return ts, nil
1050}
1051
1052// writeOnlyTransaction provides the most efficient way of doing write-only
1053// transactions. It essentially does blind writes to Cloud Spanner.
1054type writeOnlyTransaction struct {
1055	// sp is the session pool which writeOnlyTransaction uses to get Cloud
1056	// Spanner sessions for blind writes.
1057	sp *sessionPool
1058}
1059
1060// applyAtLeastOnce commits a list of mutations to Cloud Spanner at least once,
1061// unless one of the following happens:
1062//
1063//     1) Context times out.
1064//     2) An unretryable error (e.g. database not found) occurs.
1065//     3) There is a malformed Mutation object.
1066func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Mutation) (time.Time, error) {
1067	var (
1068		ts time.Time
1069		sh *sessionHandle
1070	)
1071	mPb, err := mutationsProto(ms)
1072	if err != nil {
1073		// Malformed mutation found, just return the error.
1074		return ts, err
1075	}
1076
1077	// Retry-loop for aborted transactions.
1078	// TODO: Replace with generic retryer.
1079	for {
1080		if sh == nil || sh.getID() == "" || sh.getClient() == nil {
1081			// No usable session for doing the commit, take one from pool.
1082			sh, err = t.sp.take(ctx)
1083			if err != nil {
1084				// sessionPool.Take already retries for session
1085				// creations/retrivals.
1086				return ts, err
1087			}
1088			defer sh.recycle()
1089		}
1090		res, err := sh.getClient().Commit(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.CommitRequest{
1091			Session: sh.getID(),
1092			Transaction: &sppb.CommitRequest_SingleUseTransaction{
1093				SingleUseTransaction: &sppb.TransactionOptions{
1094					Mode: &sppb.TransactionOptions_ReadWrite_{
1095						ReadWrite: &sppb.TransactionOptions_ReadWrite{},
1096					},
1097				},
1098			},
1099			Mutations: mPb,
1100		})
1101		if err != nil && !isAbortErr(err) {
1102			if isSessionNotFoundError(err) {
1103				// Discard the bad session.
1104				sh.destroy()
1105			}
1106			return ts, toSpannerErrorWithCommitInfo(err, true)
1107		} else if err == nil {
1108			if tstamp := res.GetCommitTimestamp(); tstamp != nil {
1109				ts = time.Unix(tstamp.Seconds, int64(tstamp.Nanos))
1110			}
1111			break
1112		}
1113	}
1114	return ts, toSpannerError(err)
1115}
1116
1117// isAbortedErr returns true if the error indicates that an gRPC call is
1118// aborted on the server side.
1119func isAbortErr(err error) bool {
1120	if err == nil {
1121		return false
1122	}
1123	if ErrCode(err) == codes.Aborted {
1124		return true
1125	}
1126	return false
1127}
1128